You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:01:14 UTC

[39/50] [abbrv] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/54b40664
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/54b40664
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/54b40664

Branch: refs/heads/streaming-localdict
Commit: 54b4066415a1dd48d2ceba25911d3b4754f7999b
Parents: f6e7323
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Mar 13 13:47:43 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Mar 13 13:47:43 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/invertedindex/index/Slice.java | 32 +++++----
 .../invertedindex/model/IIKeyValueCodec.java    | 75 +++++++++++++++++++-
 .../invertedindex/InvertedIndexLocalTest.java   |  5 +-
 3 files changed, 95 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54b40664/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
index 39a04cf..9e494f2 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Objects;
 import it.uniroma3.mat.extendedset.intset.ConciseSet;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.dict.Dictionary;
@@ -35,12 +36,12 @@ import org.apache.kylin.dict.Dictionary;
  */
 public class Slice implements Iterable<RawTableRecord>, Comparable<Slice> {
 
-    TableRecordInfoDigest info;
-    int nColumns;
+    final TableRecordInfoDigest info;
+    final int nColumns;
 
-    short shard;
-    long timestamp;
-    int nRecords;
+    final short shard;
+    final long timestamp;
+    final int nRecords;
     ColumnValueContainer[] containers;
     private Map<Integer, Dictionary<?>> localDictionaries;
 
@@ -180,22 +181,25 @@ public class Slice implements Iterable<RawTableRecord>, Comparable<Slice> {
      */
     @Override
     public boolean equals(Object obj) {
-        if (this == obj)
+        if (this == obj) {
             return true;
-        if (obj == null)
+        }
+        if (obj == null) {
             return false;
-        if (getClass() != obj.getClass())
+        }
+        if (getClass() != obj.getClass()) {
             return false;
+        }
         Slice other = (Slice) obj;
-        if (info == null) {
-            if (other.info != null)
-                return false;
-        } else if (!info.equals(other.info))
+        if (shard != other.shard) {
             return false;
-        if (shard != other.shard)
+        }
+        if (timestamp != other.timestamp) {
             return false;
-        if (timestamp != other.timestamp)
+        }
+        if (!Objects.equal(info, other.info)) {
             return false;
+        }
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54b40664/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 2f69421..ad1b25c 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.invertedindex.model;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -113,9 +114,79 @@ public class IIKeyValueCodec implements KeyValueCodec {
 
     @Override
 	public Iterable<Slice> decodeKeyValue(Iterable<IIRow> kvs) {
-		return new Decoder(kvs, digest);
+        return new IIRowDecoder(digest, kvs.iterator());
+//		return new Decoder(kvs, digest);
 	}
 
+    private static class IIRowDecoder implements Iterable<Slice> {
+
+        private final TableRecordInfoDigest digest;
+        private final Iterator<IIRow> iterator;
+
+        private IIRowDecoder(TableRecordInfoDigest digest, Iterator<IIRow> iterator) {
+            this.digest = digest;
+            this.iterator = iterator;
+        }
+
+        @Override
+        public Iterator<Slice> iterator() {
+            return new Iterator<Slice>() {
+                @Override
+                public boolean hasNext() {
+                    return iterator.hasNext();
+                }
+
+                @Override
+                public Slice next() {
+                    int columns = 0;
+                    ColumnValueContainer[] valueContainers = new ColumnValueContainer[digest.getColumnCount()];
+                    Map<Integer, Dictionary<?>> localDictionaries = Maps.newHashMap();
+                    boolean firstTime = true;
+                    short curShard = 0;
+                    long curTimestamp = 0;
+                    short lastShard = 0;
+                    long lastTimestamp = 0;
+                    while (iterator().hasNext() && columns < digest.getColumnCount()) {
+                        final IIRow row = iterator.next();
+                        final ImmutableBytesWritable key = row.getKey();
+                        int i = key.getOffset();
+                        curShard = (short) BytesUtil.readUnsigned(key.get(), i, SHARD_LEN);
+                        i += SHARD_LEN;
+                        curTimestamp = BytesUtil.readLong(key.get(), i, TIMEPART_LEN);
+                        i += TIMEPART_LEN;
+
+                        if (!firstTime) {
+                            Preconditions.checkArgument(curShard == lastShard, "shard should be equals in one slice, curShard is" + curShard + " lastShard is " + lastShard);
+                            Preconditions.checkArgument(curTimestamp == lastTimestamp, "timestamp should be equals in one slice, curTimestamp is" + curTimestamp + " lastTimestamp is " + lastTimestamp);
+                        }
+
+                        int curCol = BytesUtil.readUnsigned(key.get(), i, COLNO_LEN);
+                        CompressedValueContainer c = new CompressedValueContainer(digest, curCol, 0);
+                        c.fromBytes(row.getValue());
+                        valueContainers[curCol] = c;
+                        localDictionaries.put(curCol, deserialize(row.getDictionary()));
+                        columns++;
+                        lastShard = curShard;
+                        lastTimestamp = curTimestamp;
+                        firstTime = false;
+                    }
+                    Preconditions.checkArgument(columns == digest.getColumnCount(), "column count is " + columns + " should be equals to digest.getColumnCount() " + digest.getColumnCount());
+                    Slice slice = new Slice(digest, curShard, curTimestamp, valueContainers);
+                    slice.setLocalDictionaries(localDictionaries);
+                    return slice;
+                }
+
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+
+    }
+
+
 	private static class Decoder implements Iterable<Slice> {
 
         private final TableRecordInfoDigest digest;
@@ -148,7 +219,7 @@ public class IIKeyValueCodec implements KeyValueCodec {
 				ImmutableBytesWritable v = kv.getValue();
 				decodeKey(k);
                 final Dictionary<?> dictionary = deserialize(kv.getDictionary());
-                final CompressedValueContainer c = new CompressedValueContainer(dictionary.getSizeOfId(), (dictionary.getMaxId() - dictionary.getMinId() + 1), 0);
+                final CompressedValueContainer c = new CompressedValueContainer(digest, curCol, 0);
                 c.fromBytes(kv.getValue());
                 addContainer(curCol, c);
                 localDictionaries.put(curCol, dictionary);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54b40664/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
index c49a8dd..86c736d 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
@@ -131,7 +131,10 @@ public class InvertedIndexLocalTest extends LocalFileMetadataTestCase {
 		System.out.println(kvs.size() + " KV pairs");
 
 		List<Slice> slicesCopy = decodeKVs(codec, kvs);
-		assertEquals(slices, slicesCopy);
+        assertEquals(slices.size(), slicesCopy.size());
+        for (int i = 0; i < slices.size(); i++) {
+            assertEquals(slices.get(i), slicesCopy.get(i));
+        }
 
 		List<TableRecord> recordsCopy = iterateRecords(slicesCopy);
 		assertEquals(new HashSet<TableRecord>(records),