You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:00:47 UTC
[12/50] [abbrv] incubator-kylin git commit: refactor
refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/af6372de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/af6372de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/af6372de
Branch: refs/heads/streaming-localdict
Commit: af6372deabad2f56e281c775184e9b209832361d
Parents: 1142b68
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 10 18:25:33 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 10 18:25:33 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/invertedindex/index/Slice.java | 6 ++++--
.../invertedindex/model/IIKeyValueCodec.java | 17 ++++++++--------
.../kylin/invertedindex/model/KeyValuePair.java | 14 ++++++++-----
.../apache/kylin/invertedindex/tools/IICLI.java | 21 ++++++++++----------
.../invertedindex/InvertedIndexLocalTest.java | 10 +++++-----
.../endpoint/HbaseServerKVIterator.java | 21 ++++++++++----------
.../coprocessor/endpoint/LocalDictionary.java | 10 +++++-----
.../invertedindex/IIStreamBuilder.java | 7 ++++---
8 files changed, 56 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
index 56e25f0..4ac3f01 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
@@ -18,8 +18,10 @@
package org.apache.kylin.invertedindex.index;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import it.uniroma3.mat.extendedset.intset.ConciseSet;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -56,9 +58,9 @@ public class Slice implements Iterable<RawTableRecord>, Comparable<Slice> {
}
}
- public List<Dictionary<?>> getLocalDictionaries() {
+ public Map<Integer, Dictionary<?>> getLocalDictionaries() {
// TODO localdict
- return null;
+ return Collections.emptyMap();
}
public int getRecordCount() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 070b672..81f4d56 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -69,7 +69,7 @@ public class IIKeyValueCodec {
ImmutableBytesWritable key = encodeKey(slice.getShard(),
slice.getTimestamp(), col, -1);
ImmutableBytesWritable value = container.toBytes();
- return new KeyValuePair(col, key, value);
+ return new KeyValuePair(key, value);
}
private List<KeyValuePair> collectKeyValues(Slice slice,
@@ -80,7 +80,7 @@ public class IIKeyValueCodec {
for (int v = 0; v < values.size(); v++) {
ImmutableBytesWritable key = encodeKey(slice.getShard(),
slice.getTimestamp(), col, v);
- list.add(new KeyValuePair(col, key, values.get(v)));
+ list.add(new KeyValuePair(key, values.get(v)));
}
return list;
}
@@ -114,14 +114,14 @@ public class IIKeyValueCodec {
}
public Iterable<Slice> decodeKeyValue(
- Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs) {
+ Iterable<KeyValuePair> kvs) {
return new Decoder(infoDigest, kvs);
}
private static class Decoder implements Iterable<Slice> {
TableRecordInfoDigest info;
- Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator;
+ Iterator<KeyValuePair> iterator;
Slice next = null;
short curShard = Short.MIN_VALUE;
@@ -135,7 +135,7 @@ public class IIKeyValueCodec {
List<ImmutableBytesWritable> bitMapValues = Lists.newArrayList();
Decoder(TableRecordInfoDigest info,
- Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs) {
+ Iterable<KeyValuePair> kvs) {
this.info = info;
this.iterator = kvs.iterator();
}
@@ -147,10 +147,9 @@ public class IIKeyValueCodec {
// NOTE the input keys are ordered
while (next == null && iterator.hasNext()) {
- Pair<ImmutableBytesWritable, ImmutableBytesWritable> kv = iterator
- .next();
- ImmutableBytesWritable k = kv.getFirst();
- ImmutableBytesWritable v = kv.getSecond();
+ KeyValuePair kv = iterator.next();
+ ImmutableBytesWritable k = kv.getKey();
+ ImmutableBytesWritable v = kv.getValue();
decodeKey(k);
if (curShard != lastShard
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
index 445389f..7e43b2a 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
@@ -41,18 +41,22 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
*/
public final class KeyValuePair {
- private int columnIndex;
private ImmutableBytesWritable key;
private ImmutableBytesWritable value;
+ private ImmutableBytesWritable dictionary;
- public KeyValuePair(int columnIndex, ImmutableBytesWritable key, ImmutableBytesWritable value) {
- this.columnIndex = columnIndex;
+ public KeyValuePair(ImmutableBytesWritable key, ImmutableBytesWritable value) {
+ this(key, value, null);
+ }
+
+ public KeyValuePair(ImmutableBytesWritable key, ImmutableBytesWritable value, ImmutableBytesWritable dictionary) {
this.key = key;
this.value = value;
+ this.dictionary = dictionary;
}
- public int getColumnIndex() {
- return columnIndex;
+ public ImmutableBytesWritable getDictionary() {
+ return dictionary;
}
public ImmutableBytesWritable getKey() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/invertedindex/src/main/java/org/apache/kylin/invertedindex/tools/IICLI.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/tools/IICLI.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/tools/IICLI.java
index 53a5487..eb65ca8 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/tools/IICLI.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/tools/IICLI.java
@@ -18,14 +18,10 @@
package org.apache.kylin.invertedindex.tools;
-import java.io.IOException;
-import java.util.Iterator;
-
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.kylin.common.KylinConfig;
@@ -37,6 +33,10 @@ import org.apache.kylin.invertedindex.index.Slice;
import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.KeyValuePair;
+
+import java.io.IOException;
+import java.util.Iterator;
/**
* @author yangli9
@@ -65,18 +65,17 @@ public class IICLI {
System.out.println("Total " + count + " records");
}
- public static Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> readSequenceKVs(
+ public static Iterable<KeyValuePair> readSequenceKVs(
Configuration hconf, String path) throws IOException {
final Reader reader = new Reader(hconf,
SequenceFile.Reader.file(new Path(path)));
- return new Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>() {
+ return new Iterable<KeyValuePair>() {
@Override
- public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() {
- return new Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>() {
+ public Iterator<KeyValuePair> iterator() {
+ return new Iterator<KeyValuePair>() {
ImmutableBytesWritable k = new ImmutableBytesWritable();
ImmutableBytesWritable v = new ImmutableBytesWritable();
- Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(
- k, v);
+ KeyValuePair pair = new KeyValuePair(k, v);
@Override
public boolean hasNext() {
@@ -94,7 +93,7 @@ public class IICLI {
}
@Override
- public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() {
+ public KeyValuePair next() {
return pair;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
index 4b417e8..f040820 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
@@ -30,6 +30,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.kylin.invertedindex.model.KeyValuePair;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -136,8 +137,7 @@ public class InvertedIndexLocalTest extends LocalFileMetadataTestCase {
System.out.println(slices.size() + " slices");
IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
- List<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs = encodeKVs(
- codec, slices);
+ List<KeyValuePair> kvs = encodeKVs(codec, slices);
System.out.println(kvs.size() + " KV pairs");
List<Slice> slicesCopy = decodeKVs(codec, kvs);
@@ -198,10 +198,10 @@ public class InvertedIndexLocalTest extends LocalFileMetadataTestCase {
return slices;
}
- private List<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> encodeKVs(
+ private List<KeyValuePair> encodeKVs(
IIKeyValueCodec codec, List<Slice> slices) {
- List<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs = Lists
+ List<KeyValuePair> kvs = Lists
.newArrayList();
for (Slice slice : slices) {
kvs.addAll(codec.encodeKeyValue(slice));
@@ -210,7 +210,7 @@ public class InvertedIndexLocalTest extends LocalFileMetadataTestCase {
}
private List<Slice> decodeKVs(IIKeyValueCodec codec,
- List<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs) {
+ List<KeyValuePair> kvs) {
List<Slice> slices = Lists.newArrayList();
for (Slice slice : codec.decodeKeyValue(kvs)) {
slices.add(slice);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
index df26e41..6bcae0d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.kylin.invertedindex.model.KeyValuePair;
import java.io.Closeable;
import java.io.IOException;
@@ -33,28 +34,28 @@ import java.util.List;
/**
* Created by honma on 11/10/14.
*/
-public class HbaseServerKVIterator implements Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>, Closeable {
+public class HbaseServerKVIterator implements Iterable<KeyValuePair>, Closeable {
- private RegionScanner innerScaner;
+ private RegionScanner innerScanner;
List<Cell> results = new ArrayList<Cell>();
- public HbaseServerKVIterator(RegionScanner innerScaner) {
- this.innerScaner = innerScaner;
+ public HbaseServerKVIterator(RegionScanner innerScanner) {
+ this.innerScanner = innerScanner;
}
@Override
public void close() throws IOException {
- IOUtils.closeQuietly(this.innerScaner);
+ IOUtils.closeQuietly(this.innerScanner);
}
@Override
- public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() {
- return new Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>() {
+ public Iterator<KeyValuePair> iterator() {
+ return new Iterator<KeyValuePair>() {
ImmutableBytesWritable key = new ImmutableBytesWritable();
ImmutableBytesWritable value = new ImmutableBytesWritable();
- Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<>(key, value);
+ KeyValuePair pair = new KeyValuePair(key, value);
private boolean hasMore = true;
@@ -64,10 +65,10 @@ public class HbaseServerKVIterator implements Iterable<Pair<ImmutableBytesWritab
}
@Override
- public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() {
+ public KeyValuePair next() {
if (hasNext()) {
try {
- hasMore = innerScaner.nextRaw(results);
+ hasMore = innerScanner.nextRaw(results);
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/LocalDictionary.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/LocalDictionary.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/LocalDictionary.java
index 28371a3..af6cf38 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/LocalDictionary.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/LocalDictionary.java
@@ -6,7 +6,7 @@ import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
-import java.util.List;
+import java.util.Map;
/**
* Created by Hongbin Ma(Binmahone) on 3/3/15.
@@ -14,11 +14,11 @@ import java.util.List;
public class LocalDictionary implements IDictionaryAware {
private CoprocessorRowType type;
- private List<Dictionary<?>> dicts;
+ private Map<Integer, Dictionary<?>> colDictMap;
private TableRecordInfoDigest recordInfo;
- public LocalDictionary(List<Dictionary<?>> dicts, CoprocessorRowType type, TableRecordInfoDigest recordInfo) {
- this.dicts = dicts;
+ public LocalDictionary(Map<Integer, Dictionary<?>> colDictMap, CoprocessorRowType type, TableRecordInfoDigest recordInfo) {
+ this.colDictMap = colDictMap;
this.type = type;
this.recordInfo = recordInfo;
}
@@ -30,6 +30,6 @@ public class LocalDictionary implements IDictionaryAware {
@Override
public Dictionary<?> getDictionary(TblColRef col) {
- return this.dicts.get(type.getColIndexByTblColRef(col));
+ return this.colDictMap.get(type.getColIndexByTblColRef(col));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 6339fe4..36536bb 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -50,6 +50,7 @@ import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.KeyValuePair;
import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TblColRef;
@@ -164,9 +165,9 @@ public class IIStreamBuilder extends StreamBuilder {
private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
try {
List<Put> data = Lists.newArrayList();
- for (Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair : codec.encodeKeyValue(slice)) {
- final byte[] key = pair.getFirst().get();
- final byte[] value = pair.getSecond().get();
+ for (KeyValuePair pair : codec.encodeKeyValue(slice)) {
+ final byte[] key = pair.getKey().get();
+ final byte[] value = pair.getValue().get();
Put put = new Put(key);
put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
//dictionary