You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/03 11:07:58 UTC

[02/26] incubator-kylin git commit: KYLIN-609 Add Hybrid as a federation of Cube and Inverted-index realization

KYLIN-609 Add Hybrid as a federation of Cube and Inverted-index realization

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1c0719e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1c0719e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1c0719e2

Branch: refs/heads/streaming
Commit: 1c0719e2310208e8cce2438f93ac6b1b595705c9
Parents: a536576
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Fri Feb 27 14:24:59 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Fri Feb 27 14:24:59 2015 +0800

----------------------------------------------------------------------
 .../kylin/storage/hybrid/HybridInstance.java    | 22 ++++++
 .../storage/hybrid/HybridStorageEngine.java     | 79 +++++++++++++++++++-
 .../storage/hybrid/HybridTupleIterator.java     | 38 ++++++++++
 3 files changed, 135 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1c0719e2/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index b550fc0..ebba9f5 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -5,6 +5,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.RealizationEntry;
@@ -43,6 +45,18 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
         historyRealizationInstance = registry.getRealization(historyRealization.getType(), historyRealization.getRealization());
         realTimeRealizationInstance = registry.getRealization(realTimeRealization.getType(), realTimeRealization.getRealization());
 
+        if (historyRealizationInstance == null) {
+            throw new IllegalArgumentException("Didn't find realization '" + historyRealization.getType() + "'" + " with name '" + historyRealization.getRealization() + "' in '" + name + "'");
+        }
+
+        if (realTimeRealizationInstance == null) {
+            throw new IllegalArgumentException("Didn't find realization '" + realTimeRealization.getType() + "'" + " with name '" + realTimeRealization.getRealization() + "' in '" + name + "'");
+        }
+
+
+        if (realTimeRealizationInstance.getDateRangeEnd() < historyRealizationInstance.getDateRangeEnd()) {
+            throw new IllegalStateException("The real time realization's dateRangeEnd should be greater than history realization's dateRangeEnd.");
+        }
     }
 
     @Override
@@ -140,4 +154,12 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
     public long getDateRangeEnd() {
         return realTimeRealizationInstance.getDateRangeEnd();
     }
+
+    public String getModelName() {
+        if(historyRealizationInstance instanceof CubeInstance) {
+            return ((CubeInstance)historyRealizationInstance).getDescriptor().getModelName();
+        }
+
+        return ((IIInstance)historyRealizationInstance).getDescriptor().getModelName();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1c0719e2/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
index 566a4f5..8a50b7e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageEngine.java
@@ -1,13 +1,24 @@
 package org.apache.kylin.storage.hybrid;
 
-import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.filter.*;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.storage.IStorageEngine;
 import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.StorageEngineFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+
 /**
  * Created by shaoshi on 2/13/15.
  */
@@ -27,10 +38,70 @@ public class HybridStorageEngine implements IStorageEngine {
     @Override
     public ITupleIterator search(StorageContext context, SQLDigest sqlDigest) {
 
-        long conditionBoundry = hybridInstance.getHistoryRealizationInstance().getDateRangeEnd();
+        long boundary = hybridInstance.getHistoryRealizationInstance().getDateRangeEnd();
+        FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd");
+        String boundaryDate = format.format(boundary);
+
+        Collection<TblColRef> filterCols = sqlDigest.filterColumns;
+
+        String modelName = hybridInstance.getModelName();
+
+        MetadataManager metaMgr = getMetadataManager();
+
+        DataModelDesc modelDesc = metaMgr.getDataModelDesc(modelName);
+
+        String partitionColFull = modelDesc.getPartitionDesc().getPartitionDateColumn();
+
+        String partitionTable = partitionColFull.substring(0, partitionColFull.lastIndexOf("."));
+        String partitionCol = partitionColFull.substring(partitionColFull.lastIndexOf(".") + 1);
+
+
+        TableDesc factTbl = metaMgr.getTableDesc(partitionTable);
+        ColumnDesc columnDesc = factTbl.findColumnByName(partitionCol);
+        TblColRef partitionColRef = new TblColRef(columnDesc);
+
+        // search the historic realization
+
+        ITupleIterator iterator1 = searchRealization(hybridInstance.getHistoryRealizationInstance(), context, sqlDigest);
+
+
+        // now search the realtime realization, need add the boundary condition
 
-        TupleFilter filter = sqlDigest.filter;
+        CompareTupleFilter compareTupleFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GTE);
+
+        ColumnTupleFilter columnTupleFilter = new ColumnTupleFilter(partitionColRef);
+        ConstantTupleFilter constantTupleFilter = new ConstantTupleFilter(boundaryDate);
+        compareTupleFilter.addChild(columnTupleFilter);
+        compareTupleFilter.addChild(constantTupleFilter);
+
+        LogicalTupleFilter logicalTupleFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+
+        logicalTupleFilter.addChild(sqlDigest.filter);
+        logicalTupleFilter.addChild(compareTupleFilter);
+
+        sqlDigest.filter = logicalTupleFilter;
+
+        if (!sqlDigest.filterColumns.contains(partitionColRef)) {
+            sqlDigest.filterColumns.add(partitionColRef);
+        }
+
+        if (!sqlDigest.allColumns.contains(partitionColRef)) {
+            sqlDigest.allColumns.add(partitionColRef);
+        }
+
+        ITupleIterator iterator2 = searchRealization(hybridInstance.getRealTimeRealizationInstance(), context, sqlDigest);
+
+
+        return new HybridTupleIterator(new ITupleIterator[]{iterator1, iterator2});
+    }
+
+    private ITupleIterator searchRealization(IRealization realization, StorageContext context, SQLDigest sqlDigest) {
+
+        IStorageEngine storageEngine = StorageEngineFactory.getStorageEngine(realization);
+        return storageEngine.search(context, sqlDigest);
+    }
 
-        return null;
+    private MetadataManager getMetadataManager() {
+        return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1c0719e2/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
new file mode 100644
index 0000000..daf3057
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hybrid/HybridTupleIterator.java
@@ -0,0 +1,38 @@
+package org.apache.kylin.storage.hybrid;
+
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+
+/**
+ * Created by shaoshi on 2/27/15.
+ */
+public class HybridTupleIterator implements ITupleIterator {
+
+    private ITupleIterator[] iterators;
+
+    private int currentIndex;
+
+    public HybridTupleIterator(ITupleIterator[] iterators) {
+        this.iterators = iterators;
+        currentIndex = 0;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return iterators[currentIndex].hasNext() || (currentIndex + 1 < iterators.length && iterators[currentIndex + 1].hasNext());
+    }
+
+    @Override
+    public ITuple next() {
+        if (!iterators[currentIndex].hasNext() && currentIndex + 1 < iterators.length) {
+            currentIndex++;
+        }
+
+        return iterators[currentIndex].next();
+    }
+
+    @Override
+    public void close() {
+
+    }
+}