You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/22 06:13:21 UTC
[31/47] incubator-kylin git commit: KYLIN-875 rename modules:
core-common, core-cube, core-dictionary, core-cube
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
new file mode 100644
index 0000000..a26c2c9
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Objects;
+
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.IDictionaryAware;
+import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IBuildable {
+
+ @JsonBackReference
+ private CubeInstance cubeInstance;
+ @JsonProperty("uuid")
+ private String uuid;
+ @JsonProperty("name")
+ private String name;
+ @JsonProperty("storage_location_identifier")
+ private String storageLocationIdentifier; // HTable name
+ @JsonProperty("date_range_start")
+ private long dateRangeStart;
+ @JsonProperty("date_range_end")
+ private long dateRangeEnd;
+ @JsonProperty("status")
+ private SegmentStatusEnum status;
+ @JsonProperty("size_kb")
+ private long sizeKB;
+ @JsonProperty("input_records")
+ private long inputRecords;
+ @JsonProperty("input_records_size")
+ private long inputRecordsSize;
+ @JsonProperty("last_build_time")
+ private long lastBuildTime;
+ @JsonProperty("last_build_job_id")
+ private String lastBuildJobID;
+ @JsonProperty("create_time_utc")
+ private long createTimeUTC;
+
+ @JsonProperty("binary_signature")
+ private String binarySignature; // a hash of cube schema and dictionary ID, used for sanity check
+
+ @JsonProperty("dictionaries")
+ private ConcurrentHashMap<String, String> dictionaries; // table/column ==> dictionary resource path
+ @JsonProperty("snapshots")
+ private ConcurrentHashMap<String, String> snapshots; // table name ==> snapshot resource path
+
+ public CubeDesc getCubeDesc() {
+ return getCubeInstance().getDescriptor();
+ }
+
+ /**
+ * @param startDate
+ * @param endDate
+ * @return if(startDate == 0 && endDate == 0), returns "FULL_BUILD", else
+ * returns "yyyyMMddHHmmss_yyyyMMddHHmmss"
+ */
+ public static String getSegmentName(long startDate, long endDate) {
+ if (startDate == 0 && endDate == 0) {
+ return "FULL_BUILD";
+ }
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
+
+ return dateFormat.format(startDate) + "_" + dateFormat.format(endDate);
+ }
+
+ // ============================================================================
+
+ public String getUuid() {
+ return uuid;
+ }
+
+ public void setUuid(String id) {
+ this.uuid = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public long getDateRangeStart() {
+ return dateRangeStart;
+ }
+
+ public void setDateRangeStart(long dateRangeStart) {
+ this.dateRangeStart = dateRangeStart;
+ }
+
+ public long getDateRangeEnd() {
+ return dateRangeEnd;
+ }
+
+ public void setDateRangeEnd(long dateRangeEnd) {
+ this.dateRangeEnd = dateRangeEnd;
+ }
+
+ public SegmentStatusEnum getStatus() {
+ return status;
+ }
+
+ public void setStatus(SegmentStatusEnum status) {
+ this.status = status;
+ }
+
+ public long getSizeKB() {
+ return sizeKB;
+ }
+
+ public void setSizeKB(long sizeKB) {
+ this.sizeKB = sizeKB;
+ }
+
+ public long getInputRecords() {
+ return inputRecords;
+ }
+
+ public void setInputRecords(long inputRecords) {
+ this.inputRecords = inputRecords;
+ }
+
+ public long getInputRecordsSize() {
+ return inputRecordsSize;
+ }
+
+ public void setInputRecordsSize(long inputRecordsSize) {
+ this.inputRecordsSize = inputRecordsSize;
+ }
+
+ public long getLastBuildTime() {
+ return lastBuildTime;
+ }
+
+ public void setLastBuildTime(long lastBuildTime) {
+ this.lastBuildTime = lastBuildTime;
+ }
+
+ public String getLastBuildJobID() {
+ return lastBuildJobID;
+ }
+
+ public void setLastBuildJobID(String lastBuildJobID) {
+ this.lastBuildJobID = lastBuildJobID;
+ }
+
+
+ public long getCreateTimeUTC() {
+ return createTimeUTC;
+ }
+
+ public void setCreateTimeUTC(long createTimeUTC) {
+ this.createTimeUTC = createTimeUTC;
+ }
+
+ public String getBinarySignature() {
+ return binarySignature;
+ }
+
+ public void setBinarySignature(String binarySignature) {
+ this.binarySignature = binarySignature;
+ }
+
+ public CubeInstance getCubeInstance() {
+ return cubeInstance;
+ }
+
+ public void setCubeInstance(CubeInstance cubeInstance) {
+ this.cubeInstance = cubeInstance;
+ }
+
+ public String getStorageLocationIdentifier() {
+
+ return storageLocationIdentifier;
+ }
+
+ public Map<String, String> getDictionaries() {
+ if (dictionaries == null)
+ dictionaries = new ConcurrentHashMap<String, String>();
+ return dictionaries;
+ }
+
+ public Map<String, String> getSnapshots() {
+ if (snapshots == null)
+ snapshots = new ConcurrentHashMap<String, String>();
+ return snapshots;
+ }
+
+ public String getSnapshotResPath(String table) {
+ return getSnapshots().get(table);
+ }
+
+ public void putSnapshotResPath(String table, String snapshotResPath) {
+ getSnapshots().put(table, snapshotResPath);
+ }
+
+ public Collection<String> getDictionaryPaths() {
+ return getDictionaries().values();
+ }
+
+ public Collection<String> getSnapshotPaths() {
+ return getSnapshots().values();
+ }
+
+ public String getDictResPath(TblColRef col) {
+ return getDictionaries().get(dictKey(col));
+ }
+
+ public void putDictResPath(TblColRef col, String dictResPath) {
+ getDictionaries().put(dictKey(col), dictResPath);
+ }
+
+ private String dictKey(TblColRef col) {
+ return col.getTable() + "/" + col.getName();
+ }
+
+ public void setStorageLocationIdentifier(String storageLocationIdentifier) {
+ this.storageLocationIdentifier = storageLocationIdentifier;
+ }
+
+ @Override
+ public int getColumnLength(TblColRef col) {
+ Dictionary<?> dict = getDictionary(col);
+ if (dict == null) {
+ return this.getCubeDesc().getRowkey().getColumnLength(col);
+ } else {
+ return dict.getSizeOfId();
+ }
+ }
+
+ @Override
+ public Dictionary<?> getDictionary(TblColRef col) {
+ return CubeManager.getInstance(this.getCubeInstance().getConfig()).getDictionary(this, col);
+ }
+
+ public void validate() {
+ if (cubeInstance.getDescriptor().getModel().getPartitionDesc().isPartitioned() && dateRangeStart >= dateRangeEnd)
+ throw new IllegalStateException("dateRangeStart(" + dateRangeStart + ") must be smaller than dateRangeEnd(" + dateRangeEnd + ") in segment " + this);
+ }
+
+ @Override
+ public int compareTo(CubeSegment other) {
+ long comp = this.dateRangeStart - other.dateRangeStart;
+ if (comp != 0)
+ return comp < 0 ? -1 : 1;
+
+ comp = this.dateRangeEnd - other.dateRangeEnd;
+ if (comp != 0)
+ return comp < 0 ? -1 : 1;
+ else
+ return 0;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((cubeInstance == null) ? 0 : cubeInstance.hashCode());
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + ((status == null) ? 0 : status.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ CubeSegment other = (CubeSegment) obj;
+ if (cubeInstance == null) {
+ if (other.cubeInstance != null)
+ return false;
+ } else if (!cubeInstance.equals(other.cubeInstance))
+ return false;
+ if (uuid == null) {
+ if (other.uuid != null)
+ return false;
+ } else if (!uuid.equals(other.uuid))
+ return false;
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
+ return false;
+ if (status != other.status)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this).add("uuid", uuid).add("create_time_utc:", createTimeUTC).add("name", name).add("last_build_job_id", lastBuildJobID).add("status", status).toString();
+ }
+
+
+ public void setDictionaries(ConcurrentHashMap<String, String> dictionaries) {
+ this.dictionaries = dictionaries;
+ }
+
+ public void setSnapshots(ConcurrentHashMap<String, String> snapshots) {
+ this.snapshots = snapshots;
+ }
+
+ public String getStatisticsResourcePath() {
+ return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid());
+ }
+
+ public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId) {
+ return ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + cubeSegmentId + ".seq";
+ }
+
+ @Override
+ public int getSourceType() {
+ return 0;
+ }
+
+ @Override
+ public int getEngineType() {
+ return 0;
+ }
+
+ @Override
+ public int getStorageType() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
new file mode 100644
index 0000000..dffaa48
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube;
+
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+
+/**
+ * Hold changes to a cube so that they can be applied as one unit.
+ */
+public class CubeUpdate {
+ private CubeInstance cubeInstance;
+ private CubeSegment[] toAddSegs = null;
+ private CubeSegment[] toRemoveSegs = null;
+ private CubeSegment[] toUpdateSegs = null;
+ private RealizationStatusEnum status;
+ private String owner;
+ private int cost = -1;
+
+ public CubeUpdate(CubeInstance cubeInstance) {
+ this.cubeInstance = cubeInstance;
+ }
+
+ public CubeInstance getCubeInstance() {
+ return cubeInstance;
+ }
+
+ public CubeUpdate setCubeInstance(CubeInstance cubeInstance) {
+ this.cubeInstance = cubeInstance;
+ return this;
+ }
+
+ public CubeSegment[] getToAddSegs() {
+ return toAddSegs;
+ }
+
+ public CubeUpdate setToAddSegs(CubeSegment... toAddSegs) {
+ this.toAddSegs = toAddSegs;
+ return this;
+ }
+
+ public CubeSegment[] getToRemoveSegs() {
+ return toRemoveSegs;
+ }
+
+ public CubeUpdate setToRemoveSegs(CubeSegment... toRemoveSegs) {
+ this.toRemoveSegs = toRemoveSegs;
+ return this;
+ }
+
+ public CubeSegment[] getToUpdateSegs() {
+ return toUpdateSegs;
+ }
+
+ public CubeUpdate setToUpdateSegs(CubeSegment... toUpdateSegs) {
+ this.toUpdateSegs = toUpdateSegs;
+ return this;
+ }
+
+ public RealizationStatusEnum getStatus() {
+ return status;
+ }
+
+ public CubeUpdate setStatus(RealizationStatusEnum status) {
+ this.status = status;
+ return this;
+ }
+
+ public String getOwner() {
+ return owner;
+ }
+
+ public CubeUpdate setOwner(String owner) {
+ this.owner = owner;
+ return this;
+ }
+
+ public int getCost() {
+ return cost;
+ }
+
+ public CubeUpdate setCost(int cost) {
+ this.cost = cost;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
new file mode 100644
index 0000000..feb0f1a
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cli;
+
+import java.io.IOException;
+
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class DictionaryGeneratorCLI {
+
+ private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
+
+ public static void processSegment(KylinConfig config, String cubeName, String segmentName, String factColumnsPath) throws IOException {
+ CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+ CubeSegment segment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+ processSegment(config, segment, factColumnsPath);
+ }
+
+ private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String factColumnsPath) throws IOException {
+ CubeManager cubeMgr = CubeManager.getInstance(config);
+
+ for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
+ // dictionary
+ for (TblColRef col : dim.getColumnRefs()) {
+ if (cubeSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
+ logger.info("Building dictionary for " + col);
+ cubeMgr.buildDictionary(cubeSeg, col, factColumnsPath);
+ }
+ }
+
+ // build snapshot
+ if (dim.getTable() != null && !dim.getTable().equalsIgnoreCase(cubeSeg.getCubeDesc().getFactTable())) {
+ // CubeSegment seg = cube.getTheOnlySegment();
+ logger.info("Building snapshot of " + dim.getTable());
+ cubeMgr.buildSnapshotTable(cubeSeg, dim.getTable());
+ logger.info("Checking snapshot of " + dim.getTable());
+ cubeMgr.getLookupTable(cubeSeg, dim); // load the table for
+ // sanity check
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java
new file mode 100644
index 0000000..24daddf
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cli;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
+
+public class DumpDictionaryCLI {
+
+ public static void main(String[] args) throws IOException {
+ for (String path : args) {
+ dump(new File(path));
+ }
+ }
+
+ public static void dump(File f) throws IOException {
+ if (f.isDirectory()) {
+ for (File c : f.listFiles())
+ dump(c);
+ return;
+ }
+
+ if (f.getName().endsWith(".dict")) {
+ DictionaryInfoSerializer ser = new DictionaryInfoSerializer();
+ DictionaryInfo dictInfo = ser.deserialize(new DataInputStream(new FileInputStream(f)));
+
+ System.out.println("============================================================================");
+ System.out.println("File: " + f.getAbsolutePath());
+ System.out.println(new Date(dictInfo.getLastModified()));
+ System.out.println(JsonUtil.writeValueAsIndentString(dictInfo));
+ dictInfo.getDictionaryObject().dump(System.out);
+ System.out.println();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
new file mode 100644
index 0000000..48db0d8
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.common;
+
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.common.util.Bytes;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyColumnIO;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+public class RowKeySplitter {
+
+ private CubeDesc cubeDesc;
+ private RowKeyColumnIO colIO;
+
+ private SplittedBytes[] splitBuffers;
+ private int bufferSize;
+
+ public SplittedBytes[] getSplitBuffers() {
+ return splitBuffers;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public RowKeySplitter(CubeSegment cubeSeg, int splitLen, int bytesLen) {
+ this.cubeDesc = cubeSeg.getCubeDesc();
+ this.colIO = new RowKeyColumnIO(cubeSeg);
+
+ this.splitBuffers = new SplittedBytes[splitLen];
+ for (int i = 0; i < splitLen; i++) {
+ this.splitBuffers[i] = new SplittedBytes(bytesLen);
+ }
+ this.bufferSize = 0;
+ }
+
+ /**
+ * @param bytes
+ * @param byteLen
+ * @return cuboid ID
+ */
+ public long split(byte[] bytes, int byteLen) {
+ this.bufferSize = 0;
+ int offset = 0;
+
+ // extract cuboid id
+ SplittedBytes cuboidIdSplit = this.splitBuffers[this.bufferSize++];
+ cuboidIdSplit.length = RowConstants.ROWKEY_CUBOIDID_LEN;
+ System.arraycopy(bytes, offset, cuboidIdSplit.value, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+ offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+ long cuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length);
+ Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+
+ // rowkey columns
+ for (int i = 0; i < cuboid.getColumns().size(); i++) {
+ TblColRef col = cuboid.getColumns().get(i);
+ int colLength = colIO.getColumnLength(col);
+ SplittedBytes split = this.splitBuffers[this.bufferSize++];
+ split.length = colLength;
+ System.arraycopy(bytes, offset, split.value, 0, colLength);
+ offset += colLength;
+ }
+
+ return cuboidId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
new file mode 100644
index 0000000..499ba28
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cuboid;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kylin.common.util.Bytes;
+
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.RowKeyColDesc;
+import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.cube.model.RowKeyDesc.AggrGroupMask;
+import org.apache.kylin.cube.model.RowKeyDesc.HierarchyMask;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class Cuboid implements Comparable<Cuboid> {
+
+ private final static Map<String, Map<Long, Cuboid>> CUBOID_CACHE = new ConcurrentHashMap<String, Map<Long, Cuboid>>();
+
+ public static Cuboid findById(CubeDesc cube, byte[] cuboidID) {
+ return findById(cube, Bytes.toLong(cuboidID));
+ }
+
+ public static Cuboid findById(CubeDesc cube, long cuboidID) {
+ Map<Long, Cuboid> cubeCache = CUBOID_CACHE.get(cube.getName());
+ if (cubeCache == null) {
+ cubeCache = new ConcurrentHashMap<Long, Cuboid>();
+ CUBOID_CACHE.put(cube.getName(), cubeCache);
+ }
+ Cuboid cuboid = cubeCache.get(cuboidID);
+ if (cuboid == null) {
+ long validCuboidID = translateToValidCuboid(cube, cuboidID);
+ cuboid = new Cuboid(cube, cuboidID, validCuboidID);
+ cubeCache.put(cuboidID, cuboid);
+ }
+ return cuboid;
+
+ }
+
+ public static boolean isValid(CubeDesc cube, long cuboidID) {
+ RowKeyDesc rowkey = cube.getRowkey();
+
+ if (cuboidID < 0) {
+ throw new IllegalArgumentException("Cuboid " + cuboidID + " should be greater than 0");
+ }
+
+ if (checkBaseCuboid(rowkey, cuboidID)) {
+ return true;
+ }
+
+ if (checkMandatoryColumns(rowkey, cuboidID) == false) {
+ return false;
+ }
+
+ if (checkAggregationGroup(rowkey, cuboidID) == false) {
+ return false;
+ }
+
+ if (checkHierarchy(rowkey, cuboidID) == false) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public static long getBaseCuboidId(CubeDesc cube) {
+ return cube.getRowkey().getFullMask();
+ }
+
+ public static Cuboid getBaseCuboid(CubeDesc cube) {
+ return findById(cube, getBaseCuboidId(cube));
+ }
+
+ // Breadth-First-Search
+ private static long translateToValidCuboid(CubeDesc cube, long cuboidID) {
+ if (Cuboid.isValid(cube, cuboidID)) {
+ return cuboidID;
+ }
+
+ HashSet<Long> dedupped = new HashSet<Long>();
+ Queue<Long> queue = new LinkedList<Long>();
+ List<Long> parents = Cuboid.getAllPossibleParents(cube, cuboidID);
+
+ // check each parent
+ addToQueue(queue, parents, dedupped);
+ while (queue.size() > 0) {
+ long parent = pollFromQueue(queue, dedupped);
+ if (Cuboid.isValid(cube, parent)) {
+ return parent;
+ } else {
+ addToQueue(queue, Cuboid.getAllPossibleParents(cube, parent), dedupped);
+ }
+ }
+ return -1;
+ }
+
+ private static List<Long> getAllPossibleParents(CubeDesc cube, long cuboidID) {
+ List<Long> allPossibleParents = new ArrayList<Long>();
+
+ for (int i = 0; i < cube.getRowkey().getRowKeyColumns().length; i++) {
+ long mask = 1L << i;
+ long parentId = cuboidID | mask;
+ if (parentId != cuboidID) {
+ allPossibleParents.add(parentId);
+ }
+ }
+
+ return allPossibleParents;
+ }
+
+ private static void addToQueue(Queue<Long> queue, List<Long> parents, HashSet<Long> dedupped) {
+ Collections.sort(parents);
+ for (Long p : parents) {
+ if (!dedupped.contains(p)) {
+ dedupped.add(p);
+ queue.offer(p);
+ }
+ }
+ }
+
+ private static long pollFromQueue(Queue<Long> queue, HashSet<Long> dedupped) {
+ long element = queue.poll();
+ dedupped.remove(element);
+ return element;
+ }
+
+ private static boolean checkBaseCuboid(RowKeyDesc rowkey, long cuboidID) {
+ long baseCuboidId = rowkey.getFullMask();
+ if (cuboidID > baseCuboidId) {
+ throw new IllegalArgumentException("Cubiod " + cuboidID + " is out of scope 0-" + baseCuboidId);
+ }
+ return baseCuboidId == cuboidID;
+ }
+
+ private static boolean checkMandatoryColumns(RowKeyDesc rowkey, long cuboidID) {
+ long mandatoryColumnMask = rowkey.getMandatoryColumnMask();
+
+ // note the all-zero cuboid (except for mandatory) is not valid
+ if (cuboidID <= mandatoryColumnMask)
+ return false;
+
+ return (cuboidID & mandatoryColumnMask) == mandatoryColumnMask;
+ }
+
+ private static boolean checkHierarchy(RowKeyDesc rowkey, long cuboidID) {
+ List<HierarchyMask> hierarchyMaskList = rowkey.getHierarchyMasks();
+ // if no hierarchy defined in metadata
+ if (hierarchyMaskList == null || hierarchyMaskList.size() == 0) {
+ return true;
+ }
+
+ hier: for (HierarchyMask hierarchyMasks : hierarchyMaskList) {
+ long result = cuboidID & hierarchyMasks.fullMask;
+ if (result > 0) {
+ // if match one of the hierarchy constrains, return true;
+ for (long mask : hierarchyMasks.allMasks) {
+ if (result == mask) {
+ continue hier;
+ }
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean checkAggregationGroup(RowKeyDesc rowkey, long cuboidID) {
+ long cuboidWithoutMandatory = cuboidID & ~rowkey.getMandatoryColumnMask();
+ long leftover;
+ for (AggrGroupMask mask : rowkey.getAggrGroupMasks()) {
+ if ((cuboidWithoutMandatory & mask.uniqueMask) != 0) {
+ leftover = cuboidWithoutMandatory & ~mask.groupMask;
+ return leftover == 0 || leftover == mask.leftoverMask;
+ }
+ }
+
+ leftover = cuboidWithoutMandatory & rowkey.getTailMask();
+ return leftover == 0 || leftover == rowkey.getTailMask();
+ }
+
+ // ============================================================================
+
+ private CubeDesc cube;
+ private final long inputID;
+ private final long id;
+ private final byte[] idBytes;
+ private final boolean requirePostAggregation;
+ private List<TblColRef> dimensionColumns;
+
+ // will translate the cuboidID if it is not valid
+ private Cuboid(CubeDesc cube, long originalID, long validID) {
+ this.cube = cube;
+ this.inputID = originalID;
+ this.id = validID;
+ this.idBytes = Bytes.toBytes(id);
+ this.dimensionColumns = translateIdToColumns(this.id);
+ this.requirePostAggregation = calcExtraAggregation(this.inputID, this.id) != 0;
+ }
+
+ private List<TblColRef> translateIdToColumns(long cuboidID) {
+ List<TblColRef> dimesnions = new ArrayList<TblColRef>();
+ RowKeyColDesc[] allColumns = cube.getRowkey().getRowKeyColumns();
+ for (int i = 0; i < allColumns.length; i++) {
+ // NOTE: the order of column in list!!!
+ long bitmask = 1L << allColumns[i].getBitIndex();
+ if ((cuboidID & bitmask) != 0) {
+ TblColRef colRef = allColumns[i].getColRef();
+ dimesnions.add(colRef);
+ }
+ }
+ return dimesnions;
+ }
+
+ private long calcExtraAggregation(long inputID, long id) {
+ long diff = id ^ inputID;
+ return eliminateHierarchyAggregation(diff);
+ }
+
+ // higher level in hierarchy can be ignored when counting aggregation columns
+ private long eliminateHierarchyAggregation(long id) {
+ List<HierarchyMask> hierarchyMaskList = cube.getRowkey().getHierarchyMasks();
+ if (hierarchyMaskList != null && hierarchyMaskList.size() > 0) {
+ for (HierarchyMask hierMask : hierarchyMaskList) {
+ long[] allMasks = hierMask.allMasks;
+ for (int i = allMasks.length - 1; i > 0; i--) {
+ long bit = allMasks[i] ^ allMasks[i - 1];
+ if ((inputID & bit) != 0) {
+ id &= ~allMasks[i - 1];
+ }
+ }
+ }
+ }
+ return id;
+ }
+
+ public CubeDesc getCube() {
+ return cube;
+ }
+
+ public List<TblColRef> getColumns() {
+ return dimensionColumns;
+ }
+
+ public List<TblColRef> getAggregationColumns() {
+ long aggrColsID = eliminateHierarchyAggregation(id);
+ return translateIdToColumns(aggrColsID);
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public byte[] getBytes() {
+ return idBytes;
+ }
+
+ public long getInputID() {
+ return inputID;
+ }
+
+ public boolean useAncestor() {
+ return inputID != id;
+ }
+
+ public boolean requirePostAggregation() {
+ return requirePostAggregation;
+ }
+
+ public static void clearCache() {
+ CUBOID_CACHE.clear();
+ }
+
+ public static void reloadCache(String cubeDescName) {
+ CUBOID_CACHE.remove(cubeDescName);
+ }
+
+ @Override
+ public String toString() {
+ return "Cuboid [id=" + id + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (id ^ (id >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Cuboid other = (Cuboid) obj;
+ if (id != other.id)
+ return false;
+ return true;
+ }
+
+ @Override
+ public int compareTo(Cuboid o) {
+ if (this.id < o.id) {
+ return -1;
+ } else if (this.id > o.id) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java
new file mode 100644
index 0000000..c6dc55a
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidCLI.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cuboid;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.TreeSet;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.cube.model.RowKeyDesc.AggrGroupMask;
+import org.apache.kylin.cube.model.RowKeyDesc.HierarchyMask;
+
+/**
+ * @author yangli9
+ *
+ */
+public class CuboidCLI {
+
+ public static void main(String[] args) throws IOException {
+ CubeDescManager cubeDescMgr = CubeDescManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ if ("test".equals(args[0])) {
+ CubeDesc cubeDesc = cubeDescMgr.getCubeDesc(args[1]);
+ simulateCuboidGeneration(cubeDesc);
+ }
+ }
+
+ public static int simulateCuboidGeneration(CubeDesc cube) {
+ CuboidScheduler scheduler = new CuboidScheduler(cube);
+
+ long baseCuboid = Cuboid.getBaseCuboidId(cube);
+ Collection<Long> cuboidSet = new TreeSet<Long>();
+ cuboidSet.add(baseCuboid);
+ LinkedList<Long> cuboidQueue = new LinkedList<Long>();
+ cuboidQueue.push(baseCuboid);
+ while (!cuboidQueue.isEmpty()) {
+ long cuboid = cuboidQueue.pop();
+ Collection<Long> spnanningCuboids = scheduler.getSpanningCuboid(cuboid);
+ for (Long sc : spnanningCuboids) {
+ boolean notfound = cuboidSet.add(sc);
+ if (!notfound) {
+ throw new IllegalStateException("Find duplicate spanning cuboid " + sc + " from cuboid " + cuboid);
+ }
+ cuboidQueue.push(sc);
+ }
+ }
+
+ TreeSet<Long> enumCuboids = enumCalcCuboidCount(cube);
+ if (enumCuboids.equals(cuboidSet) == false) {
+ throw new IllegalStateException("Expected cuboid set " + enumCuboids + "; but actual cuboid set " + cuboidSet);
+ }
+
+ int mathCount = mathCalcCuboidCount(cube);
+ if (mathCount != enumCuboids.size()) {
+ throw new IllegalStateException("Math cuboid count " + mathCount + ", but actual cuboid count " + enumCuboids.size());
+ }
+
+ return mathCount;
+
+ }
+
+ public static TreeSet<Long> enumCalcCuboidCount(CubeDesc cube) {
+ long baseCuboid = Cuboid.getBaseCuboidId(cube);
+ TreeSet<Long> expectedCuboids = new TreeSet<Long>();
+ for (long cuboid = 0; cuboid <= baseCuboid; cuboid++) {
+ if (Cuboid.isValid(cube, cuboid)) {
+ expectedCuboids.add(cuboid);
+ }
+ }
+ return expectedCuboids;
+ }
+
+ public static int[] calculateAllLevelCount(CubeDesc cube) {
+ int levels = cube.getRowkey().getNCuboidBuildLevels();
+ int[] allLevelCounts = new int[levels + 1];
+
+ CuboidScheduler scheduler = new CuboidScheduler(cube);
+ LinkedList<Long> nextQueue = new LinkedList<Long>();
+ LinkedList<Long> currentQueue = new LinkedList<Long>();
+ long baseCuboid = Cuboid.getBaseCuboidId(cube);
+ currentQueue.push(baseCuboid);
+
+ for (int i = 0; i <= levels; i++) {
+ allLevelCounts[i] = currentQueue.size();
+ while (!currentQueue.isEmpty()) {
+ long cuboid = currentQueue.pop();
+ Collection<Long> spnanningCuboids = scheduler.getSpanningCuboid(cuboid);
+ nextQueue.addAll(spnanningCuboids);
+ }
+ currentQueue = nextQueue;
+ nextQueue = new LinkedList<Long>();
+ }
+
+ return allLevelCounts;
+ }
+
+ public static int mathCalcCuboidCount(CubeDesc cube) {
+ int result = 1; // 1 for base cuboid
+
+ RowKeyDesc rowkey = cube.getRowkey();
+ AggrGroupMask[] aggrGroupMasks = rowkey.getAggrGroupMasks();
+ for (int i = 0; i < aggrGroupMasks.length; i++) {
+ boolean hasTail = i < aggrGroupMasks.length - 1 || rowkey.getTailMask() > 0;
+ result += mathCalcCuboidCount_aggrGroup(rowkey, aggrGroupMasks[i], hasTail);
+ }
+
+ return result;
+ }
+
+ private static int mathCalcCuboidCount_aggrGroup(RowKeyDesc rowkey, AggrGroupMask aggrGroupMask, boolean hasTail) {
+ long groupMask = aggrGroupMask.groupMask;
+ int n = mathCalcCuboidCount_combination(rowkey, groupMask);
+ n -= 2; // exclude group all 1 and all 0
+
+ long nonUniqueMask = groupMask & (~aggrGroupMask.uniqueMask);
+ if (nonUniqueMask > 0) {
+ // exclude duplicates caused by non-unique columns
+ // FIXME this assumes non-unique masks consolidates in ONE following group which maybe not be true
+ n -= mathCalcCuboidCount_combination(rowkey, nonUniqueMask) - 1; // exclude all 0
+ }
+
+ if (hasTail) {
+ n *= 2; // tail being 1 and 0
+ n += 2; // +1 for group all 1 and tail 0; +1 for group all 0 and tail 1
+ }
+
+ return n;
+ }
+
+ private static int mathCalcCuboidCount_combination(RowKeyDesc rowkey, long colMask) {
+ if (colMask == 0) // no column selected
+ return 0;
+
+ int count = 1;
+
+ for (HierarchyMask hierMask : rowkey.getHierarchyMasks()) {
+ long hierBits = colMask & hierMask.fullMask;
+ if (hierBits != 0) {
+ count *= Long.bitCount(hierBits) + 1; // +1 is for all-zero case
+ colMask &= ~hierBits;
+ }
+ }
+
+ count *= Math.pow(2, Long.bitCount(colMask));
+
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
new file mode 100644
index 0000000..bebfd08
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cuboid;
+
+/**
+ */
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.cube.model.RowKeyDesc.AggrGroupMask;
+
+public class CuboidScheduler {
+
+ private final CubeDesc cubeDef;
+ private final int size;
+ private final long max;
+ private final Map<Long, List<Long>> cache;
+
+ public CuboidScheduler(CubeDesc cube) {
+ this.cubeDef = cube;
+ this.size = cube.getRowkey().getRowKeyColumns().length;
+ this.max = (long) Math.pow(2, size) - 1;
+ this.cache = new ConcurrentHashMap<Long, List<Long>>();
+ }
+
+ public int getCuboidCount() {
+ return getCuboidCount(Cuboid.getBaseCuboidId(cubeDef));
+ }
+
+ private int getCuboidCount(long cuboidId) {
+ int r = 1;
+ for (Long child : getSpanningCuboid(cuboidId)) {
+ r += getCuboidCount(child);
+ }
+ return r;
+ }
+
+ public List<Long> getSpanningCuboid(long cuboid) {
+ if (cuboid > max || cuboid < 0) {
+ throw new IllegalArgumentException("Cuboid " + cuboid + " is out of scope 0-" + max);
+ }
+
+ List<Long> result = cache.get(cuboid);
+ if (result != null) {
+ return result;
+ }
+
+ // smaller sibling's children
+ Collection<Long> allPrevOffspring = new HashSet<Long>();
+ for (Long sibling : findSmallerSibling(cuboid)) {
+ Collection<Long> prevOffsprings = generateChildren(sibling);
+ allPrevOffspring.addAll(prevOffsprings);
+ }
+
+ // my children is my generation excluding smaller sibling's generation
+ result = new ArrayList<Long>();
+ for (Long offspring : generateChildren(cuboid)) {
+ if (!allPrevOffspring.contains(offspring)) {
+ result.add(offspring);
+ }
+ }
+
+ cache.put(cuboid, result);
+ return result;
+ }
+
+ private Collection<Long> generateChildren(long cuboid) {
+ Collection<Long> result = new HashSet<Long>();
+
+ // generate zero tail cuboid -- the one with all 1 in the first
+ // aggregation group and all 0 for the rest bits
+ generateZeroTailBase(cuboid, result);
+
+ RowKeyDesc rowkey = cubeDef.getRowkey();
+ long cuboidWithoutMandatory = cuboid & ~rowkey.getMandatoryColumnMask();
+ for (AggrGroupMask mask : rowkey.getAggrGroupMasks()) {
+ if (belongTo(cuboidWithoutMandatory, mask) == false)
+ continue;
+
+ long[] groupOneBitMasks = mask.groupOneBitMasks;
+ for (int i = 0; i < groupOneBitMasks.length; i++) {
+ long oneBit = groupOneBitMasks[i];
+ if ((cuboid & oneBit) == 0)
+ continue;
+
+ long child = cuboid ^ oneBit;
+ if (Cuboid.isValid(cubeDef, child)) {
+ result.add(child);
+ }
+ }
+
+ if ((cuboidWithoutMandatory & mask.uniqueMask) > 0)
+ break;
+ }
+
+ return result;
+ }
+
+ private void generateZeroTailBase(long cuboid, Collection<Long> result) {
+ RowKeyDesc rowkey = cubeDef.getRowkey();
+
+ long cuboidWithoutMandatory = cuboid & ~rowkey.getMandatoryColumnMask();
+
+ for (AggrGroupMask mask : rowkey.getAggrGroupMasks()) {
+ if ((cuboidWithoutMandatory & mask.groupMask) == mask.groupMask && (cuboidWithoutMandatory & mask.leftoverMask) == mask.leftoverMask) {
+ long zeroTail = rowkey.getMandatoryColumnMask() | mask.groupMask;
+ if (zeroTail > 0 && zeroTail != cuboid) {
+ result.add(zeroTail);
+ }
+ }
+ if ((cuboidWithoutMandatory & mask.uniqueMask) > 0)
+ break;
+ }
+ }
+
+ public Collection<Long> findSmallerSibling(long cuboid) {
+ if (!Cuboid.isValid(cubeDef, cuboid)) {
+ return Collections.emptyList();
+ }
+
+ RowKeyDesc rowkey = cubeDef.getRowkey();
+
+ // do combination in all related groups
+ long groupAllBitMask = 0;
+ for (AggrGroupMask mask : rowkey.getAggrGroupMasks()) {
+ if ((mask.groupMask & cuboid) > 0) {
+ groupAllBitMask |= mask.groupMask;
+ }
+ }
+
+ long groupBitValue = cuboid & groupAllBitMask;
+ long leftBitValue = cuboid & ~groupAllBitMask;
+ long[] groupOneBits = bits(groupAllBitMask);
+
+ Collection<Long> siblings = new HashSet<Long>();
+ combination(cuboid, siblings, groupOneBits, 0, leftBitValue, Long.bitCount(groupBitValue));
+ return siblings;
+ }
+
+ private long[] bits(long groupAllBitMask) {
+ int size = Long.bitCount(groupAllBitMask);
+ long[] r = new long[size];
+ long l = groupAllBitMask;
+ int i = 0;
+ while (l != 0) {
+ long bit = Long.highestOneBit(l);
+ r[i++] = bit;
+ l ^= bit;
+ }
+ return r;
+ }
+
+ private void combination(long cuboid, Collection<Long> siblings, long[] bitMasks, int offset, long bitValue, int k) {
+ if (k == 0) {
+ if (Cuboid.isValid(cubeDef, bitValue)) {
+ siblings.add(bitValue);
+ }
+ } else {
+ for (int i = offset; i < bitMasks.length; i++) {
+ long newBitValue = bitValue | bitMasks[i];
+ if (newBitValue < cuboid) {
+ combination(cuboid, siblings, bitMasks, i + 1, newBitValue, k - 1);
+ }
+ }
+ }
+ }
+
+ private boolean belongTo(long cuboidWithoutMandatory, AggrGroupMask mask) {
+ long groupBits = cuboidWithoutMandatory & mask.groupMask;
+ long leftoverBits = cuboidWithoutMandatory & mask.leftoverMask;
+ return groupBits > 0 && (leftoverBits == 0 || leftoverBits == mask.leftoverMask);
+ }
+
+ public int getCardinality(long cuboid) {
+ if (cuboid > max || cuboid < 0) {
+ throw new IllegalArgumentException("Cubiod " + cuboid + " is out of scope 0-" + max);
+ }
+
+ return Long.bitCount(cuboid);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
new file mode 100644
index 0000000..ddfbab3
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ *
+ * @author xjiang
+ *
+ */
+public abstract class AbstractRowKeyEncoder {
+
+ public static final byte DEFAULT_BLANK_BYTE = Dictionary.NULL;
+
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractRowKeyEncoder.class);
+
+ private static final Map<String, Map<Long, AbstractRowKeyEncoder>> ENCODER_CACHE = new ConcurrentHashMap<String, Map<Long, AbstractRowKeyEncoder>>();
+
+ public static AbstractRowKeyEncoder createInstance(CubeSegment cubeSeg, Cuboid cuboid) {
+
+ // The storage location identifier is unique for every segment
+ Map<Long, AbstractRowKeyEncoder> cubeCache = ENCODER_CACHE.get(cubeSeg.getStorageLocationIdentifier());
+
+ if (cubeCache == null) {
+ cubeCache = new HashMap<Long, AbstractRowKeyEncoder>();
+ ENCODER_CACHE.put(cuboid.getCube().getName(), cubeCache);
+ }
+
+ AbstractRowKeyEncoder encoder = cubeCache.get(cuboid.getId());
+ if (encoder == null) {
+ encoder = new RowKeyEncoder(cubeSeg, cuboid);
+ cubeCache.put(cuboid.getId(), encoder);
+ }
+ return encoder;
+ }
+
+ protected final Cuboid cuboid;
+ protected byte blankByte = DEFAULT_BLANK_BYTE;
+
+ protected AbstractRowKeyEncoder(Cuboid cuboid) {
+ this.cuboid = cuboid;
+ }
+
+ public void setBlankByte(byte blankByte) {
+ this.blankByte = blankByte;
+ }
+
+ abstract public byte[] encode(Map<TblColRef, String> valueMap);
+
+ abstract public byte[] encode(byte[][] values);
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
new file mode 100644
index 0000000..a17bb28
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import java.util.Arrays;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+
+/**
+ *
+ * @author xjiang
+ *
+ */
+public class FuzzyKeyEncoder extends RowKeyEncoder {
+
+ public FuzzyKeyEncoder(CubeSegment seg, Cuboid cuboid) {
+ super(seg, cuboid);
+ }
+
+ @Override
+ protected byte[] defaultValue(int length) {
+ byte[] keyBytes = new byte[length];
+ Arrays.fill(keyBytes, RowConstants.FUZZY_MASK_ZERO);
+ return keyBytes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
new file mode 100644
index 0000000..5077287
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import java.util.Arrays;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ *
+ * @author xjiang
+ *
+ */
+public class FuzzyMaskEncoder extends RowKeyEncoder {
+
+ public FuzzyMaskEncoder(CubeSegment seg, Cuboid cuboid) {
+ super(seg, cuboid);
+ }
+
+ @Override
+ protected int fillHeader(byte[] bytes, byte[][] values) {
+ // always fuzzy match cuboid ID to lock on the selected cuboid
+ int cuboidStart = this.headerLength - RowConstants.ROWKEY_CUBOIDID_LEN;
+ Arrays.fill(bytes, 0, cuboidStart, RowConstants.FUZZY_MASK_ONE);
+ Arrays.fill(bytes, cuboidStart, this.headerLength, RowConstants.FUZZY_MASK_ZERO);
+ return this.headerLength;
+ }
+
+ @Override
+ protected void fillColumnValue(TblColRef column, int columnLen, byte[] value, int valueLen, byte[] outputValue, int outputValueOffset) {
+ if (value == null) {
+ Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.FUZZY_MASK_ONE);
+ } else {
+ Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.FUZZY_MASK_ZERO);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
new file mode 100644
index 0000000..95b4718
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+/**
+ *
+ * @author xjiang
+ *
+ */
+public class RowConstants {
+
+ // row key fixed length place holder
+ public static final byte ROWKEY_PLACE_HOLDER_BYTE = 9;
+ // row key lower bound
+ public static final byte ROWKEY_LOWER_BYTE = 0;
+ // row key upper bound
+ public static final byte ROWKEY_UPPER_BYTE = (byte) 0xff;
+ // row key cuboid id length
+ public static final int ROWKEY_CUBOIDID_LEN = 8;
+
+ // fuzzy mask
+ public static final byte FUZZY_MASK_ZERO = 0;
+ public static final byte FUZZY_MASK_ONE = 1;
+
+ // row value delimiter
+ public static final byte ROWVALUE_DELIMITER_BYTE = 7;
+ public static final String ROWVALUE_DELIMITER_STRING = String.valueOf((char) 7);
+ public static final byte[] ROWVALUE_DELIMITER_BYTES = { 7 };
+
+ public static final int ROWVALUE_BUFFER_SIZE = 1024 * 1024; // 1 MB
+
+ // marker class
+ public static final byte[][] BYTE_ARR_MARKER = new byte[0][];
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
new file mode 100644
index 0000000..a2add5c
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import java.util.Arrays;
+
+import org.apache.kylin.dict.IDictionaryAware;
+import org.apache.kylin.common.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * Read/Write column values from/into bytes
+ *
+ * @author yangli9
+ */
+@SuppressWarnings("unchecked")
+public class RowKeyColumnIO {
+
+ private static final Logger logger = LoggerFactory.getLogger(RowKeyColumnIO.class);
+
+ private IDictionaryAware IDictionaryAwareness;
+
+ public RowKeyColumnIO(IDictionaryAware IDictionaryAwareness) {
+ this.IDictionaryAwareness = IDictionaryAwareness;
+ }
+
+ public int getColumnLength(TblColRef col) {
+ return IDictionaryAwareness.getColumnLength(col);
+ }
+
+ //TODO is type cast really necessary here?
+ public Dictionary<String> getDictionary(TblColRef col) {
+ return (Dictionary<String>) IDictionaryAwareness.getDictionary(col);
+ }
+
+ public void writeColumnWithoutDictionary(byte[] src, int srcOffset, int srcLength, byte[] dst, int dstOffset, int dstLength) {
+ if (srcLength >= dstLength) {
+ System.arraycopy(src, srcOffset, dst, dstOffset, dstLength);
+ } else {
+ System.arraycopy(src, srcOffset, dst, dstOffset, srcLength);
+ Arrays.fill(dst, dstOffset + srcLength, dstOffset + dstLength, RowConstants.ROWKEY_PLACE_HOLDER_BYTE);
+ }
+ }
+
+ public void writeColumnWithDictionary(Dictionary<String> dictionary, byte[] src, int srcOffset, int srcLength, byte[] dst, int dstOffset, int dstLength, int roundingFlag, int defaultValue) {
+ // dict value
+ try {
+ int id = dictionary.getIdFromValueBytes(src, srcOffset, srcLength, roundingFlag);
+ BytesUtil.writeUnsigned(id, dst, dstOffset, dictionary.getSizeOfId());
+ } catch (IllegalArgumentException ex) {
+ Arrays.fill(dst, dstOffset, dstOffset + dstLength, (byte) defaultValue);
+ logger.error("Can't translate value " + Bytes.toString(src, srcOffset, srcLength) + " to dictionary ID, roundingFlag " + roundingFlag + ". Using default value " + String.format("\\x%02X", defaultValue));
+ }
+ }
+
+
+
+ public void writeColumn(TblColRef column, byte[] value, int valueLen, byte defaultValue, byte[] output, int outputOffset) {
+ writeColumn(column, value, valueLen, 0, defaultValue, output, outputOffset);
+ }
+
+ public void writeColumn(TblColRef column, byte[] value, int valueLen, int roundingFlag, byte defaultValue, byte[] output, int outputOffset) {
+
+ final Dictionary<String> dict = getDictionary(column);
+ final int columnLen = getColumnLength(column);
+
+ // non-dict value
+ if (dict == null) {
+ byte[] valueBytes = padFixLen(columnLen, value, valueLen);
+ System.arraycopy(valueBytes, 0, output, outputOffset, columnLen);
+ return;
+ }
+
+ // dict value
+ try {
+ int id = dict.getIdFromValueBytes(value, 0, valueLen, roundingFlag);
+ BytesUtil.writeUnsigned(id, output, outputOffset, dict.getSizeOfId());
+ } catch (IllegalArgumentException ex) {
+ for (int i = outputOffset; i < outputOffset + columnLen; i++) {
+ output[i] = defaultValue;
+ }
+ logger.error("Can't translate value " + Bytes.toString(value, 0, valueLen) + " to dictionary ID, roundingFlag " + roundingFlag + ". Using default value " + String.format("\\x%02X", defaultValue));
+ }
+ }
+
+ private byte[] padFixLen(int length, byte[] valueBytes, int valLen) {
+ if (valLen == length) {
+ return valueBytes;
+ } else if (valLen < length) {
+ byte[] newValueBytes = new byte[length];
+ System.arraycopy(valueBytes, 0, newValueBytes, 0, valLen);
+ Arrays.fill(newValueBytes, valLen, length, RowConstants.ROWKEY_PLACE_HOLDER_BYTE);
+ return newValueBytes;
+ } else {
+ return Arrays.copyOf(valueBytes, length);
+ }
+ }
+
+ public String readColumnString(TblColRef col, byte[] bytes, int offset, int length) {
+ Dictionary<String> dict = getDictionary(col);
+ if (dict == null) {
+ if (isNull(bytes, offset, length)) {
+ return null;
+ }
+ bytes = removeFixLenPad(bytes, offset, length);
+ return Bytes.toString(bytes);
+ } else {
+ int id = BytesUtil.readUnsigned(bytes, offset, length);
+ try {
+ String value = dict.getValueFromId(id);
+ return value;
+ } catch (IllegalArgumentException e) {
+ logger.error("Can't get dictionary value for column " + col.getName() + " (id = " + id + ")");
+ return "";
+ }
+ }
+ }
+
+ public String readColumnString(TblColRef col, byte[] bytes, int bytesLen) {
+ return readColumnString(col, bytes, 0, bytesLen);
+ }
+
+ private boolean isNull(byte[] bytes, int offset, int length) {
+ // all 0xFF is NULL
+ if (length == 0) {
+ return false;
+ }
+ for (int i = 0; i < bytes.length; i++) {
+ if (bytes[i + offset] != AbstractRowKeyEncoder.DEFAULT_BLANK_BYTE) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private byte[] removeFixLenPad(byte[] bytes, int offset, int length) {
+ int padCount = 0;
+ for (int i = 0; i < length; i++) {
+ if (bytes[i + offset] == RowConstants.ROWKEY_PLACE_HOLDER_BYTE) {
+ padCount++;
+ }
+ }
+
+ int size = length - padCount;
+ byte[] stripBytes = new byte[size];
+ int index = 0;
+ for (int i = 0; i < length; i++) {
+ byte vb = bytes[i + offset];
+ if (vb != RowConstants.ROWKEY_PLACE_HOLDER_BYTE) {
+ stripBytes[index++] = vb;
+ }
+ }
+ return stripBytes;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
new file mode 100644
index 0000000..d7a48d9
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ * @author yangli9
+ */
+abstract public class RowKeyColumnOrder implements Comparator<String> {
+
+ public static final NumberOrder NUMBER_ORDER = new NumberOrder();
+ public static final StringOrder STRING_ORDER = new StringOrder();
+
+ public static RowKeyColumnOrder getInstance(DataType type) {
+ if (type.isNumberFamily())
+ return NUMBER_ORDER;
+ else
+ return STRING_ORDER;
+ }
+
+ public String max(Collection<String> values) {
+ String max = null;
+ for (String v : values) {
+ if (max == null || compare(max, v) < 0)
+ max = v;
+ }
+ return max;
+ }
+
+ public String min(Collection<String> values) {
+ String min = null;
+ for (String v : values) {
+ if (min == null || compare(min, v) > 0)
+ min = v;
+ }
+ return min;
+ }
+
+ public String min(String v1, String v2) {
+ if (v1 == null)
+ return v2;
+ else if (v2 == null)
+ return v1;
+ else
+ return compare(v1, v2) <= 0 ? v1 : v2;
+ }
+
+ public String max(String v1, String v2) {
+ if (v1 == null)
+ return v2;
+ else if (v2 == null)
+ return v1;
+ else
+ return compare(v1, v2) >= 0 ? v1 : v2;
+ }
+
+ @Override
+ public int compare(String o1, String o2) {
+ // consider null
+ if (o1 == o2)
+ return 0;
+ if (o1 == null)
+ return -1;
+ if (o2 == null)
+ return 1;
+
+ return compareNonNull(o1, o2);
+ }
+
+ abstract int compareNonNull(String o1, String o2);
+
+ private static class StringOrder extends RowKeyColumnOrder {
+ @Override
+ public int compareNonNull(String o1, String o2) {
+ return o1.compareTo(o2);
+ }
+ }
+
+ private static class NumberOrder extends RowKeyColumnOrder {
+ @Override
+ public int compareNonNull(String o1, String o2) {
+ double d1 = Double.parseDouble(o1);
+ double d2 = Double.parseDouble(o2);
+ return Double.compare(d1, d2);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
new file mode 100644
index 0000000..1b896a0
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ *
+ * @author xjiang
+ *
+ */
+public class RowKeyDecoder {
+
+ private final CubeDesc cubeDesc;
+ private final RowKeyColumnIO colIO;
+ private final RowKeySplitter rowKeySplitter;
+
+ private Cuboid cuboid;
+ private List<String> values;
+
+ public RowKeyDecoder(CubeSegment cubeSegment) {
+ this.cubeDesc = cubeSegment.getCubeDesc();
+ this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 255);
+ this.colIO = new RowKeyColumnIO(cubeSegment);
+ this.values = new ArrayList<String>();
+ }
+
+ public long decode(byte[] bytes) throws IOException {
+ this.values.clear();
+
+ long cuboidId = rowKeySplitter.split(bytes, bytes.length);
+ initCuboid(cuboidId);
+
+ SplittedBytes[] splits = rowKeySplitter.getSplitBuffers();
+
+ int offset = 1; // skip cuboid id part
+
+ for (int i = 0; i < this.cuboid.getColumns().size(); i++) {
+ TblColRef col = this.cuboid.getColumns().get(i);
+ collectValue(col, splits[offset].value, splits[offset].length);
+ offset++;
+ }
+
+ return cuboidId;
+ }
+
+ private void initCuboid(long cuboidID) {
+ if (this.cuboid != null && this.cuboid.getId() == cuboidID) {
+ return;
+ }
+ this.cuboid = Cuboid.findById(cubeDesc, cuboidID);
+ }
+
+ private void collectValue(TblColRef col, byte[] valueBytes, int length) throws IOException {
+ String strValue = colIO.readColumnString(col, valueBytes, length);
+ values.add(strValue);
+ }
+
+ public RowKeySplitter getRowKeySplitter() {
+ return rowKeySplitter;
+ }
+
+ public void setCuboid(Cuboid cuboid) {
+ this.cuboid = cuboid;
+ }
+
+ public List<TblColRef> getColumns() {
+ return cuboid.getColumns();
+ }
+
+ public List<String> getValues() {
+ return values;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append(cuboid.getId());
+ for (Object value : values) {
+ buf.append(",");
+ buf.append(value);
+ }
+ return buf.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
new file mode 100644
index 0000000..5b6cd91
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Bytes;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.metadata.model.TblColRef;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class RowKeyEncoder extends AbstractRowKeyEncoder {
+
+ private int bytesLength;
+ protected int headerLength;
+ private RowKeyColumnIO colIO;
+
+ protected RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) {
+ super(cuboid);
+ colIO = new RowKeyColumnIO(cubeSeg);
+ bytesLength = headerLength = RowConstants.ROWKEY_CUBOIDID_LEN; // header
+ for (TblColRef column : cuboid.getColumns()) {
+ bytesLength += colIO.getColumnLength(column);
+ }
+ }
+
+ public RowKeyColumnIO getColumnIO() {
+ return colIO;
+ }
+
+ public int getColumnOffset(TblColRef col) {
+ int offset = RowConstants.ROWKEY_CUBOIDID_LEN;
+
+ for (TblColRef dimCol : cuboid.getColumns()) {
+ if (col.equals(dimCol))
+ return offset;
+ offset += colIO.getColumnLength(dimCol);
+ }
+
+ throw new IllegalArgumentException("Column " + col + " not found on cuboid " + cuboid);
+ }
+
+ public int getColumnLength(TblColRef col) {
+ return colIO.getColumnLength(col);
+ }
+
+ public int getRowKeyLength() {
+ return bytesLength;
+ }
+
+ public int getHeaderLength() {
+ return headerLength;
+ }
+
+ @Override
+ public byte[] encode(Map<TblColRef, String> valueMap) {
+ List<byte[]> valueList = new ArrayList<byte[]>();
+ for (TblColRef bdCol : cuboid.getColumns()) {
+ String value = valueMap.get(bdCol);
+ valueList.add(valueStringToBytes(value));
+ }
+ byte[][] values = valueList.toArray(RowConstants.BYTE_ARR_MARKER);
+ return encode(values);
+ }
+
+ public byte[] valueStringToBytes(String value) {
+ if (value == null)
+ return null;
+ else
+ return Bytes.toBytes(value);
+ }
+
+ @Override
+ public byte[] encode(byte[][] values) {
+ byte[] bytes = new byte[this.bytesLength];
+ int offset = fillHeader(bytes, values);
+
+ for (int i = 0; i < cuboid.getColumns().size(); i++) {
+ TblColRef column = cuboid.getColumns().get(i);
+ int colLength = colIO.getColumnLength(column);
+ byte[] value = values[i];
+ if (value == null) {
+ fillColumnValue(column, colLength, null, 0, bytes, offset);
+ } else {
+ fillColumnValue(column, colLength, value, value.length, bytes, offset);
+ }
+ offset += colLength;
+
+ }
+ return bytes;
+ }
+
+ protected int fillHeader(byte[] bytes, byte[][] values) {
+ int offset = 0;
+ System.arraycopy(cuboid.getBytes(), 0, bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
+ offset += RowConstants.ROWKEY_CUBOIDID_LEN;
+ if (this.headerLength != offset) {
+ throw new IllegalStateException("Expected header length is " + headerLength + ". But the offset is " + offset);
+ }
+ return offset;
+ }
+
+ protected void fillColumnValue(TblColRef column, int columnLen, byte[] value, int valueLen, byte[] outputValue, int outputValueOffset) {
+ // special null value case
+ if (value == null) {
+ byte[] valueBytes = defaultValue(columnLen);
+ System.arraycopy(valueBytes, 0, outputValue, outputValueOffset, columnLen);
+ return;
+ }
+
+ colIO.writeColumn(column, value, valueLen, this.blankByte, outputValue, outputValueOffset);
+ }
+
+ protected byte[] defaultValue(int length) {
+ byte[] values = new byte[length];
+ Arrays.fill(values, this.blankByte);
+ return values;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
new file mode 100644
index 0000000..2ab12b9
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.kv;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+
+/**
+ * @author xjiang
+ */
+public class RowValueDecoder implements Cloneable {
+
+ private final HBaseColumnDesc hbaseColumn;
+ private final byte[] hbaseColumnFamily;
+ private final byte[] hbaseColumnQualifier;
+
+ private final MeasureCodec codec;
+ private final BitSet projectionIndex;
+ private final MeasureDesc[] measures;
+ private Object[] values;
+
+ public RowValueDecoder(HBaseColumnDesc hbaseColumn) {
+ this.hbaseColumn = hbaseColumn;
+ this.hbaseColumnFamily = Bytes.toBytes(hbaseColumn.getColumnFamilyName());
+ this.hbaseColumnQualifier = Bytes.toBytes(hbaseColumn.getQualifier());
+ this.projectionIndex = new BitSet();
+ this.measures = hbaseColumn.getMeasures();
+ this.codec = new MeasureCodec(measures);
+ this.values = new Object[measures.length];
+ }
+
+ public void decode(Result hbaseRow) {
+ decode(hbaseRow, true);
+ }
+
+ public void decode(Result hbaseRow, boolean convertToJavaObject) {
+ decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, hbaseColumnQualifier), convertToJavaObject);
+ }
+
+ public void decode(byte[] bytes) {
+ decode(bytes, true);
+ }
+
+ public void decode(byte[] bytes, boolean convertToJavaObject) {
+ decode(ByteBuffer.wrap(bytes), convertToJavaObject);
+ }
+
+ private void decode(ByteBuffer buffer, boolean convertToJavaObject) {
+ codec.decode(buffer, values);
+ if (convertToJavaObject) {
+ convertToJavaObjects(values, values, convertToJavaObject);
+ }
+ }
+
+ private void convertToJavaObjects(Object[] mapredObjs, Object[] results, boolean convertToJavaObject) {
+ for (int i = 0; i < mapredObjs.length; i++) {
+ Object o = mapredObjs[i];
+
+ if (o instanceof LongWritable)
+ o = ((LongWritable) o).get();
+ else if (o instanceof IntWritable)
+ o = ((IntWritable) o).get();
+ else if (o instanceof DoubleWritable)
+ o = ((DoubleWritable) o).get();
+ else if (o instanceof FloatWritable)
+ o = ((FloatWritable) o).get();
+
+ results[i] = o;
+ }
+ }
+
+ public void setIndex(int bitIndex) {
+ projectionIndex.set(bitIndex);
+ }
+
+ public HBaseColumnDesc getHBaseColumn() {
+ return hbaseColumn;
+ }
+
+ public BitSet getProjectionIndex() {
+ return projectionIndex;
+ }
+
+ public Object[] getValues() {
+ return values;
+ }
+
+ public MeasureDesc[] getMeasures() {
+ return measures;
+ }
+
+ public boolean hasMemHungryCountDistinct() {
+ for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
+ FunctionDesc func = measures[i].getFunction();
+ if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static boolean hasMemHungryCountDistinct(Collection<RowValueDecoder> rowValueDecoders) {
+ for (RowValueDecoder decoder : rowValueDecoders) {
+ if (decoder.hasMemHungryCountDistinct())
+ return true;
+ }
+ return false;
+ }
+
+}