You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:01:14 UTC
[39/50] [abbrv] incubator-kylin git commit: refactor
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/54b40664
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/54b40664
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/54b40664
Branch: refs/heads/streaming-localdict
Commit: 54b4066415a1dd48d2ceba25911d3b4754f7999b
Parents: f6e7323
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Mar 13 13:47:43 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Mar 13 13:47:43 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/invertedindex/index/Slice.java | 32 +++++----
.../invertedindex/model/IIKeyValueCodec.java | 75 +++++++++++++++++++-
.../invertedindex/InvertedIndexLocalTest.java | 5 +-
3 files changed, 95 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54b40664/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
index 39a04cf..9e494f2 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import com.google.common.base.Objects;
import it.uniroma3.mat.extendedset.intset.ConciseSet;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.dict.Dictionary;
@@ -35,12 +36,12 @@ import org.apache.kylin.dict.Dictionary;
*/
public class Slice implements Iterable<RawTableRecord>, Comparable<Slice> {
- TableRecordInfoDigest info;
- int nColumns;
+ final TableRecordInfoDigest info;
+ final int nColumns;
- short shard;
- long timestamp;
- int nRecords;
+ final short shard;
+ final long timestamp;
+ final int nRecords;
ColumnValueContainer[] containers;
private Map<Integer, Dictionary<?>> localDictionaries;
@@ -180,22 +181,25 @@ public class Slice implements Iterable<RawTableRecord>, Comparable<Slice> {
*/
@Override
public boolean equals(Object obj) {
- if (this == obj)
+ if (this == obj) {
return true;
- if (obj == null)
+ }
+ if (obj == null) {
return false;
- if (getClass() != obj.getClass())
+ }
+ if (getClass() != obj.getClass()) {
return false;
+ }
Slice other = (Slice) obj;
- if (info == null) {
- if (other.info != null)
- return false;
- } else if (!info.equals(other.info))
+ if (shard != other.shard) {
return false;
- if (shard != other.shard)
+ }
+ if (timestamp != other.timestamp) {
return false;
- if (timestamp != other.timestamp)
+ }
+ if (!Objects.equal(info, other.info)) {
return false;
+ }
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54b40664/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 2f69421..ad1b25c 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -18,6 +18,7 @@
package org.apache.kylin.invertedindex.model;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -113,9 +114,79 @@ public class IIKeyValueCodec implements KeyValueCodec {
@Override
public Iterable<Slice> decodeKeyValue(Iterable<IIRow> kvs) {
- return new Decoder(kvs, digest);
+ return new IIRowDecoder(digest, kvs.iterator());
+// return new Decoder(kvs, digest);
}
+ private static class IIRowDecoder implements Iterable<Slice> {
+
+ private final TableRecordInfoDigest digest;
+ private final Iterator<IIRow> iterator;
+
+ private IIRowDecoder(TableRecordInfoDigest digest, Iterator<IIRow> iterator) {
+ this.digest = digest;
+ this.iterator = iterator;
+ }
+
+ @Override
+ public Iterator<Slice> iterator() {
+ return new Iterator<Slice>() {
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Slice next() {
+ int columns = 0;
+ ColumnValueContainer[] valueContainers = new ColumnValueContainer[digest.getColumnCount()];
+ Map<Integer, Dictionary<?>> localDictionaries = Maps.newHashMap();
+ boolean firstTime = true;
+ short curShard = 0;
+ long curTimestamp = 0;
+ short lastShard = 0;
+ long lastTimestamp = 0;
+ while (iterator().hasNext() && columns < digest.getColumnCount()) {
+ final IIRow row = iterator.next();
+ final ImmutableBytesWritable key = row.getKey();
+ int i = key.getOffset();
+ curShard = (short) BytesUtil.readUnsigned(key.get(), i, SHARD_LEN);
+ i += SHARD_LEN;
+ curTimestamp = BytesUtil.readLong(key.get(), i, TIMEPART_LEN);
+ i += TIMEPART_LEN;
+
+ if (!firstTime) {
+ Preconditions.checkArgument(curShard == lastShard, "shard should be equals in one slice, curShard is" + curShard + " lastShard is " + lastShard);
+ Preconditions.checkArgument(curTimestamp == lastTimestamp, "timestamp should be equals in one slice, curTimestamp is" + curTimestamp + " lastTimestamp is " + lastTimestamp);
+ }
+
+ int curCol = BytesUtil.readUnsigned(key.get(), i, COLNO_LEN);
+ CompressedValueContainer c = new CompressedValueContainer(digest, curCol, 0);
+ c.fromBytes(row.getValue());
+ valueContainers[curCol] = c;
+ localDictionaries.put(curCol, deserialize(row.getDictionary()));
+ columns++;
+ lastShard = curShard;
+ lastTimestamp = curTimestamp;
+ firstTime = false;
+ }
+ Preconditions.checkArgument(columns == digest.getColumnCount(), "column count is " + columns + " should be equals to digest.getColumnCount() " + digest.getColumnCount());
+ Slice slice = new Slice(digest, curShard, curTimestamp, valueContainers);
+ slice.setLocalDictionaries(localDictionaries);
+ return slice;
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ }
+
+
private static class Decoder implements Iterable<Slice> {
private final TableRecordInfoDigest digest;
@@ -148,7 +219,7 @@ public class IIKeyValueCodec implements KeyValueCodec {
ImmutableBytesWritable v = kv.getValue();
decodeKey(k);
final Dictionary<?> dictionary = deserialize(kv.getDictionary());
- final CompressedValueContainer c = new CompressedValueContainer(dictionary.getSizeOfId(), (dictionary.getMaxId() - dictionary.getMinId() + 1), 0);
+ final CompressedValueContainer c = new CompressedValueContainer(digest, curCol, 0);
c.fromBytes(kv.getValue());
addContainer(curCol, c);
localDictionaries.put(curCol, dictionary);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/54b40664/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
index c49a8dd..86c736d 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
@@ -131,7 +131,10 @@ public class InvertedIndexLocalTest extends LocalFileMetadataTestCase {
System.out.println(kvs.size() + " KV pairs");
List<Slice> slicesCopy = decodeKVs(codec, kvs);
- assertEquals(slices, slicesCopy);
+ assertEquals(slices.size(), slicesCopy.size());
+ for (int i = 0; i < slices.size(); i++) {
+ assertEquals(slices.get(i), slicesCopy.get(i));
+ }
List<TableRecord> recordsCopy = iterateRecords(slicesCopy);
assertEquals(new HashSet<TableRecord>(records),