You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/06 13:05:44 UTC
[2/3] incubator-kylin git commit: KYLIN-1116 Use local dictionary for
InvertedIndex batch building
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
index 237bdd7..4c0c3ef 100644
--- a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
+++ b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
@@ -4,49 +4,8 @@
"owner": null,
"version": null,
"cost": 10,
- "status": "READY",
- "segments": [
- {
- "uuid": null,
- "name": "19700101000000_20140901000000",
- "status": "READY",
- "dictionaries": {
- "DEFAULT.TEST_KYLIN_FACT/LSTG_SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID/30c9d5f0-abe4-4d1c-a147-610234d90ff1.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/9355165b-06ad-4c04-977c-a251e66e7e98.dict",
- "DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME": "/dict/DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME/d5c40465-75e1-40bc-a960-06308f0134a6.dict",
- "DEFAULT.TEST_KYLIN_FACT/SELLER_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/SELLER_ID/fea3a18d-3f20-4b8b-a880-7af93e69241b.dict",
- "EDW.TEST_SITES/CRE_USER": "/dict/EDW.TEST_SITES/CRE_USER/244af7a2-7352-4b30-811f-46e637d7a133.dict",
- "DEFAULT.TEST_KYLIN_FACT/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
- "EDW.TEST_CAL_DT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
- "DEFAULT.TEST_KYLIN_FACT/TRANS_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/TRANS_ID/7fb8fed8-7f3b-4089-a85d-3ac07f575c82.dict",
- "EDW.TEST_CAL_DT/WEEK_BEG_DT": "/dict/EDW.TEST_CAL_DT/WEEK_BEG_DT/962b5f64-bee1-49ee-a072-af882193b719.dict",
- "DEFAULT.TEST_KYLIN_FACT/SLR_SEGMENT_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
- "EDW.TEST_SITES/SITE_NAME": "/dict/EDW.TEST_SITES/SITE_NAME/f363531d-e969-4264-bffd-ac18f8f47220.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE/1fc93a94-1feb-4af4-8078-81a6f1b65e2b.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME/895739d6-27e1-4ecc-b798-5851c319ea40.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
- "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
- "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC/bfb86010-bf4c-4534-a2b0-59d020aed197.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1/8ae44fb8-b01a-4db1-a901-dc5f463038cb.dict",
- "EDW.TEST_SITES/SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
- "DEFAULT.TEST_KYLIN_FACT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3/278d7283-518a-4cd0-b6e1-2573b523bf1f.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER/bb1bb7a5-b02d-45eb-b3c8-f1a4515264ca.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/b2d6fae1-eaac-4ac2-8a01-42e5c8b5c198.dict"
- },
- "storage_location_identifier": "test_III",
- "date_range_start": 0,
- "date_range_end": 0,
- "size_kb": 0,
- "input_records": 0,
- "input_records_size": 0,
- "last_build_time": 0,
- "last_build_job_id": null,
- "create_time": null,
- "binary_signature": null
- }
- ],
+ "status": "DISABLED",
+ "segments": [],
"last_modified": 1420016227424,
"descriptor": "test_kylin_ii_inner_join_desc",
"create_time": null,
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
index 07c1970..90c21bb 100644
--- a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
+++ b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
@@ -4,49 +4,8 @@
"owner": null,
"version": null,
"cost": 10,
- "status": "READY",
- "segments": [
- {
- "uuid": null,
- "name": "19700101000000_20140901000000",
- "status": "READY",
- "dictionaries": {
- "DEFAULT.TEST_KYLIN_FACT/LSTG_SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID/30c9d5f0-abe4-4d1c-a147-610234d90ff1.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/9355165b-06ad-4c04-977c-a251e66e7e98.dict",
- "DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME": "/dict/DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME/d5c40465-75e1-40bc-a960-06308f0134a6.dict",
- "DEFAULT.TEST_KYLIN_FACT/SELLER_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/SELLER_ID/fea3a18d-3f20-4b8b-a880-7af93e69241b.dict",
- "EDW.TEST_SITES/CRE_USER": "/dict/EDW.TEST_SITES/CRE_USER/244af7a2-7352-4b30-811f-46e637d7a133.dict",
- "DEFAULT.TEST_KYLIN_FACT/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
- "EDW.TEST_CAL_DT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
- "DEFAULT.TEST_KYLIN_FACT/TRANS_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/TRANS_ID/7fb8fed8-7f3b-4089-a85d-3ac07f575c82.dict",
- "EDW.TEST_CAL_DT/WEEK_BEG_DT": "/dict/EDW.TEST_CAL_DT/WEEK_BEG_DT/962b5f64-bee1-49ee-a072-af882193b719.dict",
- "DEFAULT.TEST_KYLIN_FACT/SLR_SEGMENT_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
- "EDW.TEST_SITES/SITE_NAME": "/dict/EDW.TEST_SITES/SITE_NAME/f363531d-e969-4264-bffd-ac18f8f47220.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE/1fc93a94-1feb-4af4-8078-81a6f1b65e2b.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME/895739d6-27e1-4ecc-b798-5851c319ea40.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
- "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
- "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC/bfb86010-bf4c-4534-a2b0-59d020aed197.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1/8ae44fb8-b01a-4db1-a901-dc5f463038cb.dict",
- "EDW.TEST_SITES/SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
- "DEFAULT.TEST_KYLIN_FACT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3/278d7283-518a-4cd0-b6e1-2573b523bf1f.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER/bb1bb7a5-b02d-45eb-b3c8-f1a4515264ca.dict",
- "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/b2d6fae1-eaac-4ac2-8a01-42e5c8b5c198.dict"
- },
- "storage_location_identifier": "test_III",
- "date_range_start": 0,
- "date_range_end": 0,
- "size_kb": 0,
- "input_records": 0,
- "input_records_size": 0,
- "last_build_time": 0,
- "last_build_job_id": null,
- "create_time": null,
- "binary_signature": null
- }
- ],
+ "status": "DISABLED",
+ "segments": [],
"last_modified": 1420016227424,
"descriptor": "test_kylin_ii_left_join_desc",
"create_time": null,
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json b/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
index a703ae4..9abe3ed 100644
--- a/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
+++ b/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
@@ -4,26 +4,8 @@
"owner": null,
"version": null,
"cost": 10,
- "status": "READY",
- "segments": [
- {
- "uuid": null,
- "name": "19700101000000_20190901000000",
- "status": "READY",
- "dictionaries": {
- },
- "storage_location_identifier": "KYLIN_2STEAMTEST",
- "date_range_start": 0,
- "date_range_end": 0,
- "size_kb": 0,
- "input_records": 0,
- "input_records_size": 0,
- "last_build_time": 0,
- "last_build_job_id": null,
- "create_time": null,
- "binary_signature": null
- }
- ],
+ "status": "DISABLED",
+ "segments": [],
"last_modified": 0,
"descriptor": "test_streaming_table_ii_desc",
"create_time": null,
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
index 7aff714..5633004 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
@@ -32,13 +32,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.Broadcaster;
import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.realization.IRealization;
@@ -131,45 +125,7 @@ public class IIManager implements IRealizationProvider {
}
return result;
}
-
- public void buildInvertedIndexDictionary(IISegment iiSeg, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
- logger.info("Start building ii dictionary");
- DictionaryManager dictMgr = getDictionaryManager();
- IIDesc iiDesc = iiSeg.getIIInstance().getDescriptor();
- for (TblColRef column : iiDesc.listAllColumns()) {
- logger.info("Dealing with column {}", column);
- if (iiDesc.isMetricsCol(column)) {
- continue;
- }
-
- DictionaryInfo dict = dictMgr.buildDictionary(iiDesc.getModel(), "true", column, factTableValueProvider);
- iiSeg.putDictResPath(column, dict.getResourcePath());
- }
- updateII(iiSeg.getIIInstance());
- }
-
- /**
- * return null if no dictionary for given column
- */
- public Dictionary<?> getDictionary(IISegment iiSeg, TblColRef col) {
- DictionaryInfo info = null;
- try {
- DictionaryManager dictMgr = getDictionaryManager();
- // logger.info("Using metadata url " + metadataUrl +
- // " for DictionaryManager");
- String dictResPath = iiSeg.getDictResPath(col);
- if (dictResPath == null)
- return null;
-
- info = dictMgr.getDictionaryInfo(dictResPath);
- if (info == null)
- throw new IllegalStateException("No dictionary found by " + dictResPath + ", invalid II state; II segment" + iiSeg + ", col " + col);
- } catch (IOException e) {
- throw new IllegalStateException("Failed to get dictionary for II segment" + iiSeg + ", col" + col, e);
- }
-
- return info.getDictionaryObject();
- }
+
public IIInstance createII(IIInstance ii) throws IOException {
@@ -300,10 +256,6 @@ public class IIManager implements IRealizationProvider {
}
}
- private DictionaryManager getDictionaryManager() {
- return DictionaryManager.getInstance(config);
- }
-
private ResourceStore getStore() {
return ResourceStore.getStore(this.config);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
index adcca8b..c3ca464 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
@@ -19,18 +19,14 @@
package org.apache.kylin.invertedindex;
import java.text.SimpleDateFormat;
-import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.TimeZone;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.IDictionaryAware;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -49,7 +45,7 @@ import org.apache.kylin.metadata.realization.IRealizationSegment;
// TODO: remove segment concept for II, append old hbase table
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class IISegment implements Comparable<IISegment>, IDictionaryAware, IRealizationSegment {
+public class IISegment implements Comparable<IISegment>, IRealizationSegment {
@JsonBackReference
private IIInstance iiInstance;
@@ -83,11 +79,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
private String binarySignature; // a hash of schema and dictionary ID,
// used for sanity check
- @JsonProperty("dictionaries")
- private ConcurrentHashMap<String, String> dictionaries; // table/column ==>
- // dictionary
- // resource path
-
private transient TableRecordInfo tableRecordInfo;
/**
@@ -216,28 +207,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
return storageLocationIdentifier;
}
- public Map<String, String> getDictionaries() {
- if (dictionaries == null)
- dictionaries = new ConcurrentHashMap<String, String>();
- return dictionaries;
- }
-
- public Collection<String> getDictionaryPaths() {
- return getDictionaries().values();
- }
-
- public String getDictResPath(TblColRef col) {
- return getDictionaries().get(dictKey(col));
- }
-
- public void putDictResPath(TblColRef col, String dictResPath) {
- getDictionaries().put(dictKey(col), dictResPath);
- }
-
- private String dictKey(TblColRef col) {
- return col.getTable() + "/" + col.getName();
- }
-
/**
* @param storageLocationIdentifier the storageLocationIdentifier to set
*/
@@ -262,10 +231,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
return tableRecordInfo;
}
- // public void updateDictionary(List<Dictionary<?>> dicts) {
- // getTableRecordInfo().updateDictionary( dicts);
- // }
-
public List<TblColRef> getColumns() {
return this.getTableRecordInfo().getColumns();
}
@@ -275,20 +240,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
return Objects.toStringHelper(this).add("uuid", uuid).add("create_time_utc:", createTimeUTC).add("name", name).add("last_build_job_id", lastBuildJobID).add("status", status).toString();
}
- @Override
- public int getColumnLength(TblColRef col) {
-
- int index = getTableRecordInfo().findColumn(col);
- return getTableRecordInfo().getDigest().length(index);
- }
-
- @Override
- public Dictionary<?> getDictionary(TblColRef col) {
-
- int index = getTableRecordInfo().findColumn(col);
- return getTableRecordInfo().dict(index);
- }
-
public long getCreateTimeUTC() {
return createTimeUTC;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
new file mode 100644
index 0000000..1c293d7
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.invertedindex.index;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ */
+public final class SliceBuilder {
+
+ private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class);
+
+ private final BatchSliceMaker sliceMaker;
+ private final IIDesc iiDesc;
+ private final boolean useLocalDict;
+
+ public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
+ this.iiDesc = desc;
+ this.sliceMaker = new BatchSliceMaker(desc, shard);
+ this.useLocalDict = useLocalDict;
+ }
+
+ public Slice buildSlice(StreamingBatch microStreamBatch) {
+ final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
+ @Nullable
+ @Override
+ public List<String> apply(@Nullable StreamingMessage input) {
+ return input.getData();
+ }
+ });
+ final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
+ TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
+ return build(messages, tableRecordInfo, dictionaries);
+ }
+
+ private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
+ final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
+ @Nullable
+ @Override
+ public TableRecord apply(@Nullable List<String> input) {
+ TableRecord result = tableRecordInfo.createTableRecord();
+ for (int i = 0; i < input.size(); i++) {
+ result.setValueString(i, input.get(i));
+ }
+ return result;
+ }
+ }));
+ slice.setLocalDictionaries(localDictionary);
+ return slice;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
index baafacd..8a0c2ba 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
@@ -53,11 +53,6 @@ public class IIInstanceTest extends LocalFileMetadataTestCase {
Assert.assertTrue(iiInstances.size() > 0);
- IIInstance instance = iiInstances.get(0);
-
- Dictionary dict = mgr.getDictionary(instance.getFirstSegment(), instance.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_SITE_ID"));
-
- Assert.assertNotNull(dict);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 1eb2683..b8d1333 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
@@ -78,6 +79,8 @@ public class HiveMRInput implements IMRInput {
try {
HCatInputFormat.setInput(job, dbName, tableName);
job.setInputFormatClass(HCatInputFormat.class);
+
+ job.setMapOutputValueClass(org.apache.hive.hcatalog.data.DefaultHCatRecord.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index a1ab712..ee5a555 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -48,8 +48,8 @@ import kafka.message.MessageAndOffset;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.engine.streaming.IStreamingInput;
-import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.source.kafka.util.KafkaRequester;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index aace8bc..3455f1d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -47,7 +47,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.IntermediateColumnDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.kafka.config.KafkaConfig;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index c0e506f..9691ea7 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -40,7 +40,7 @@ import java.util.List;
import kafka.message.MessageAndOffset;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.TblColRef;
import com.google.common.collect.Lists;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 4fae228..00f93a5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -43,7 +43,7 @@ import kafka.message.MessageAndOffset;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
index de5e58e..0e29a0c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
@@ -40,7 +40,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.DaemonThreadFactory;
import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.source.kafka.StreamingParser;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index 2833ea4..96b7fa7 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -11,7 +11,7 @@ import kafka.javaapi.PartitionMetadata;
import kafka.message.MessageAndOffset;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
import org.apache.kylin.source.kafka.StreamingParser;
import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
index e4b688f..0e0a8ce 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
@@ -19,6 +19,7 @@
package org.apache.kylin.storage.hbase.ii;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
@@ -43,13 +44,29 @@ public class IICreateHFileMapper extends KylinMapper<ImmutableBytesWritable, Imm
@Override
protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException {
+ ByteBuffer buffer = ByteBuffer.wrap(value.get(), value.getOffset(), value.getLength());
+ int totalLength = value.getLength();
+ int valueLength = buffer.getInt();
+ int dictionaryLength = totalLength - valueLength - 4;
KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, //
timestamp, Type.Put, //
- value.get(), value.getOffset(), value.getLength());
+ buffer.array(), buffer.position(), valueLength);
+ // write value
context.write(key, kv);
+
+ kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
+ IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
+ IIDesc.HBASE_DICTIONARY_BYTES, 0, IIDesc.HBASE_DICTIONARY_BYTES.length, //
+ timestamp, Type.Put, //
+ buffer.array(), buffer.position() + valueLength, dictionaryLength);
+
+
+ // write dictionary
+ context.write(key, kv);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
index 0a72a91..bcfe346 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
@@ -47,7 +47,7 @@ public class IICreateHTableJob extends AbstractHadoopJob {
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
-
+ HBaseAdmin admin = null;
try {
options.addOption(OPTION_II_NAME);
options.addOption(OPTION_HTABLE_NAME);
@@ -61,6 +61,22 @@ public class IICreateHTableJob extends AbstractHadoopJob {
IIInstance ii = iiManager.getII(iiName);
int sharding = ii.getDescriptor().getSharding();
+
+ Configuration conf = HBaseConfiguration.create(getConf());
+ // check if the table already exists
+ admin = new HBaseAdmin(conf);
+ if (admin.tableExists(tableName)) {
+ if (admin.isTableEnabled(tableName)) {
+ logger.info("Table " + tableName + " already exists and is enabled, no need to create.");
+ return 0;
+ } else {
+ logger.error("Table " + tableName + " is disabled, couldn't append data");
+ return 1;
+ }
+ }
+
+ // table doesn't exist, need to create
+
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY);
cf.setMaxVersions(1);
@@ -100,7 +116,6 @@ public class IICreateHTableJob extends AbstractHadoopJob {
tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
- Configuration conf = HBaseConfiguration.create(getConf());
if (User.isHBaseSecurityEnabled(conf)) {
// add coprocessor for bulk load
tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
@@ -108,13 +123,7 @@ public class IICreateHTableJob extends AbstractHadoopJob {
IIDeployCoprocessorCLI.deployCoprocessor(tableDesc);
- // drop the table first
- HBaseAdmin admin = new HBaseAdmin(conf);
- if (admin.tableExists(tableName)) {
- admin.disableTable(tableName);
- admin.deleteTable(tableName);
- }
-
+
// create table
byte[][] splitKeys = getSplits(sharding);
if (splitKeys.length == 0)
@@ -126,12 +135,14 @@ public class IICreateHTableJob extends AbstractHadoopJob {
}
}
System.out.println("create hbase table " + tableName + " done.");
- admin.close();
return 0;
} catch (Exception e) {
printUsage(options);
throw e;
+ } finally {
+ if (admin != null)
+ admin.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
index ff8b659..11c1711 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
@@ -54,7 +54,7 @@ public class HBaseMROutput implements IMROutput {
@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
- steps.addCubingGarbageCollectionSteps(jobFlow);
+ steps.addInvertedIndexGarbageCollectionSteps(jobFlow);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 1267d2d..03db6b0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -184,7 +184,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
}
public void addSaveIIToHTableSteps(DefaultChainedExecutable jobFlow, String rootPath) {
- // create htable step
+ // create htable if it doesn't exist
jobFlow.addTask(createCreateIIHTableStep(seg));
final String iiPath = rootPath + "*";
@@ -198,6 +198,22 @@ public class HBaseMRSteps extends JobBuilderSupport {
}
+ public void addInvertedIndexGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
+ String jobId = jobFlow.getId();
+
+ List<String> toDeletePaths = new ArrayList<>();
+ toDeletePaths.add(getJobWorkingDir(jobId));
+
+ HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
+ step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+ step.setDeletePaths(toDeletePaths);
+ step.setJobId(jobId);
+
+ jobFlow.addTask(step);
+ }
+
+
+
private HadoopShellExecutable createCreateIIHTableStep(IRealizationSegment seg) {
HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index b200c2e..15dc993 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -46,7 +46,6 @@ import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
import org.apache.kylin.invertedindex.IISegment;
@@ -108,7 +107,7 @@ public class DeployCoprocessorCLI {
private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+ Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index 88cb7de..2137f57 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -35,8 +35,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
index 335e00c..6c6ed80 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
@@ -6,6 +6,7 @@ import java.util.Arrays;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -40,7 +41,10 @@ public class TsConditionEraserTest extends LocalFileMetadataTestCase {
@Before
public void setup() throws IOException {
this.createTestMetadata();
- this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+ IIManager iiManager = IIManager.getInstance(getTestConfig());
+ this.ii = iiManager.getII("test_kylin_ii_left_join");
+ IISegment segment = iiManager.buildSegment(ii, 0, System.currentTimeMillis());
+ ii.getSegments().add(segment);
this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
this.factTableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.TEST_KYLIN_FACT");
this.caldt = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "CAL_DT");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
index e271129..8b56605 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
@@ -18,20 +18,11 @@
package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
+import com.google.common.collect.Lists;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.metadata.measure.LongMutable;
import org.apache.kylin.metadata.measure.MeasureAggregator;
@@ -45,7 +36,11 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.*;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
/**
*
@@ -90,6 +85,10 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
@Test
public void testSerializeAggregator() {
final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+ if (ii.getFirstSegment() == null) {
+ IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+ ii.getSegments().add(segment);
+ }
final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
final EndpointAggregators endpointAggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
byte[] x = EndpointAggregators.serialize(endpointAggregators);
@@ -139,6 +138,10 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
@Test
public void basicTest() {
final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+ if (ii.getFirstSegment() == null) {
+ IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+ ii.getSegments().add(segment);
+ }
final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
final EndpointAggregators aggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
final EndpointAggregationCache aggCache = new EndpointAggregationCache(aggregators);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
index 791002f..3e34495 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
import org.junit.After;
@@ -41,6 +42,10 @@ public class TableRecordInfoTest extends LocalFileMetadataTestCase {
public void setup() throws IOException {
this.createTestMetadata();
this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+ if (ii.getFirstSegment() == null) {
+ IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+ ii.getSegments().add(segment);
+ }
this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
index 412e335..4e5a9d9 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.invertedindex.IIInstance;
import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -58,6 +59,10 @@ public class TsConditionExtractorTest extends LocalFileMetadataTestCase {
public void setup() throws IOException {
this.createTestMetadata();
this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+ if (ii.getFirstSegment() == null) {
+ IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+ ii.getSegments().add(segment);
+ }
this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
this.factTableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.TEST_KYLIN_FACT");
this.calDt = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "CAL_DT");