You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:01:18 UTC

[43/50] [abbrv] incubator-kylin git commit: fix

fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/5880b79b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/5880b79b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/5880b79b

Branch: refs/heads/streaming-localdict
Commit: 5880b79bd99d9be52e22022de0b75d0fd4bda8d4
Parents: 73c53b4
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Mar 13 16:51:33 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Mar 13 16:51:33 2015 +0800

----------------------------------------------------------------------
 .../invertedindex/index/TableRecordInfo.java    | 65 ++++++++++----------
 .../index/TableRecordInfoDigest.java            | 14 ++++-
 .../invertedindex/model/IIKeyValueCodec.java    | 30 ++++++---
 .../storage/hbase/InvertedIndexHBaseTest.java   | 26 +++-----
 4 files changed, 77 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5880b79b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index c178742..3f8b72a 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -18,20 +18,19 @@
 
 package org.apache.kylin.invertedindex.index;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 import com.google.common.collect.Maps;
 import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
 import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 /**
  * @author yangli9
  *         <p/>
@@ -46,31 +45,7 @@ public class TableRecordInfo {
     final Map<Integer, Dictionary<?>> dictionaryMap;
 
     public TableRecordInfo(IISegment iiSegment) {
-
-        this.desc = iiSegment.getIIInstance().getDescriptor();
-        this.dictionaryMap = Maps.newHashMap();
-        Map<TblColRef, FixedLenMeasureCodec<?>> measureCodecMap = Maps.newHashMap();
-
-        DictionaryManager dictMgr = DictionaryManager.getInstance(desc.getConfig());
-        int index = 0;
-        for (TblColRef tblColRef : desc.listAllColumns()) {
-            ColumnDesc col = tblColRef.getColumn();
-            if (desc.isMetricsCol(index)) {
-                measureCodecMap.put(tblColRef, FixedLenMeasureCodec.get(col.getType()));
-            } else {
-                String dictPath = iiSegment.getDictResPath(tblColRef);
-                if (dictPath != null) {
-                    try {
-                        dictionaryMap.put(index, dictMgr.getDictionary(dictPath));
-                    } catch (IOException e) {
-                        throw new RuntimeException("dictionary " + dictPath + " does not exist ", e);
-                    }
-                }
-            }
-            index++;
-        }
-
-        digest = createDigest(dictionaryMap, measureCodecMap);
+        this(iiSegment.getIIDesc(), Collections.<Integer, Dictionary<?>>emptyMap());
     }
 
     public TableRecordInfo(IIDesc desc, Map<Integer, Dictionary<?>> dictionaryMap) {
@@ -127,6 +102,34 @@ public class TableRecordInfo {
         return new TableRecordInfoDigest(nColumns, byteFormLen, offsets, dictMaxIds, lengths, isMetric, dataTypes);
     }
 
+    public static TableRecordInfoDigest createDigest(int nColumns, boolean[] isMetric, String[] dataTypes, Map<Integer, Dictionary<?>> dictionaryMap) {
+        int[] dictMaxIds = new int[nColumns];
+        int[] lengths = new int[nColumns];
+        for (int i = 0; i < nColumns; ++i) {
+            if (isMetric[i]) {
+                final FixedLenMeasureCodec<?> fixedLenMeasureCodec = FixedLenMeasureCodec.get(DataType.getInstance(dataTypes[i]));
+                lengths[i] = fixedLenMeasureCodec.getLength();
+            } else {
+                final Dictionary<?> dictionary = dictionaryMap.get(i);
+                if (dictionary != null) {
+                    lengths[i] = dictionary.getSizeOfId();
+                    dictMaxIds[i] = dictionary.getMaxId();
+                }
+            }
+        }
+        // offsets
+        int pos = 0;
+        int[] offsets = new int[nColumns];
+        for (int i = 0; i < nColumns; i++) {
+            offsets[i] = pos;
+            pos += lengths[i];
+        }
+
+        int byteFormLen = pos;
+
+        return new TableRecordInfoDigest(nColumns, byteFormLen, offsets, dictMaxIds, lengths, isMetric, dataTypes);
+    }
+
     public TableRecord createTableRecord() {
         return new TableRecord(digest.createTableRecordBytes(), this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5880b79b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
index ff6b192..cf519d7 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
@@ -89,7 +89,19 @@ public class TableRecordInfoDigest {
 		return ret;
 	}
 
-	public RawTableRecord createTableRecordBytes() {
+    public boolean[] getIsMetric() {
+        return isMetric;
+    }
+
+    public int[] getLengths() {
+        return lengths;
+    }
+
+    public String[] getMetricDataTypes() {
+        return metricDataTypes;
+    }
+
+    public RawTableRecord createTableRecordBytes() {
 		return new RawTableRecord(this);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5880b79b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index ad1b25c..e3284fc 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -115,16 +115,16 @@ public class IIKeyValueCodec implements KeyValueCodec {
     @Override
 	public Iterable<Slice> decodeKeyValue(Iterable<IIRow> kvs) {
         return new IIRowDecoder(digest, kvs.iterator());
-//		return new Decoder(kvs, digest);
+//		return new Decoder(kvs, incompleteDigest);
 	}
 
     private static class IIRowDecoder implements Iterable<Slice> {
 
-        private final TableRecordInfoDigest digest;
+        private final TableRecordInfoDigest incompleteDigest;
         private final Iterator<IIRow> iterator;
 
         private IIRowDecoder(TableRecordInfoDigest digest, Iterator<IIRow> iterator) {
-            this.digest = digest;
+            this.incompleteDigest = digest;
             this.iterator = iterator;
         }
 
@@ -139,14 +139,15 @@ public class IIKeyValueCodec implements KeyValueCodec {
                 @Override
                 public Slice next() {
                     int columns = 0;
-                    ColumnValueContainer[] valueContainers = new ColumnValueContainer[digest.getColumnCount()];
+                    ColumnValueContainer[] valueContainers = new ColumnValueContainer[incompleteDigest.getColumnCount()];
                     Map<Integer, Dictionary<?>> localDictionaries = Maps.newHashMap();
                     boolean firstTime = true;
                     short curShard = 0;
                     long curTimestamp = 0;
                     short lastShard = 0;
                     long lastTimestamp = 0;
-                    while (iterator().hasNext() && columns < digest.getColumnCount()) {
+
+                    while (iterator().hasNext() && columns < incompleteDigest.getColumnCount()) {
                         final IIRow row = iterator.next();
                         final ImmutableBytesWritable key = row.getKey();
                         int i = key.getOffset();
@@ -161,16 +162,25 @@ public class IIKeyValueCodec implements KeyValueCodec {
                         }
 
                         int curCol = BytesUtil.readUnsigned(key.get(), i, COLNO_LEN);
-                        CompressedValueContainer c = new CompressedValueContainer(digest, curCol, 0);
-                        c.fromBytes(row.getValue());
-                        valueContainers[curCol] = c;
-                        localDictionaries.put(curCol, deserialize(row.getDictionary()));
+                        final Dictionary<?> dictionary = deserialize(row.getDictionary());
+                        if (incompleteDigest.isMetrics(curCol)) {
+                            CompressedValueContainer c = new CompressedValueContainer(incompleteDigest, curCol, 0);
+                            c.fromBytes(row.getValue());
+                            valueContainers[curCol] = c;
+                        } else {
+                            CompressedValueContainer c = new CompressedValueContainer(dictionary.getSizeOfId(), dictionary.getMaxId() - dictionary.getMinId() + 1, 0);
+                            c.fromBytes(row.getValue());
+                            valueContainers[curCol] = c;
+                        }
+                        localDictionaries.put(curCol, dictionary);
                         columns++;
                         lastShard = curShard;
                         lastTimestamp = curTimestamp;
                         firstTime = false;
                     }
-                    Preconditions.checkArgument(columns == digest.getColumnCount(), "column count is " + columns + " should be equals to digest.getColumnCount() " + digest.getColumnCount());
+                    Preconditions.checkArgument(columns == incompleteDigest.getColumnCount(), "column count is " + columns + " should be equals to incompleteDigest.getColumnCount() " + incompleteDigest.getColumnCount());
+
+                    TableRecordInfoDigest digest = TableRecordInfo.createDigest(columns, incompleteDigest.getIsMetric(), incompleteDigest.getMetricDataTypes(), localDictionaries);
                     Slice slice = new Slice(digest, curShard, curTimestamp, valueContainers);
                     slice.setLocalDictionaries(localDictionaries);
                     return slice;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5880b79b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
index 9e37447..bcf5b10 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
@@ -18,23 +18,12 @@
 
 package org.apache.kylin.storage.hbase;
 
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
 import com.google.common.collect.Lists;
-
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.invertedindex.IIInstance;
@@ -46,6 +35,11 @@ import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
 
 /**
  * @author yangli9
@@ -69,7 +63,7 @@ public class InvertedIndexHBaseTest extends HBaseMetadataTestCase {
         Configuration hconf = HadoopUtil.newHBaseConfiguration(hbaseUrl);
         hconn = HConnectionManager.createConnection(hconf);
 
-        this.info = new TableRecordInfo(seg.getIIDesc(), null);
+        this.info = new TableRecordInfo(seg);
     }
 
     @After