You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:00:47 UTC

[12/50] [abbrv] incubator-kylin git commit: refactor

refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/af6372de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/af6372de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/af6372de

Branch: refs/heads/streaming-localdict
Commit: af6372deabad2f56e281c775184e9b209832361d
Parents: 1142b68
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Mar 10 18:25:33 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Tue Mar 10 18:25:33 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/invertedindex/index/Slice.java |  6 ++++--
 .../invertedindex/model/IIKeyValueCodec.java    | 17 ++++++++--------
 .../kylin/invertedindex/model/KeyValuePair.java | 14 ++++++++-----
 .../apache/kylin/invertedindex/tools/IICLI.java | 21 ++++++++++----------
 .../invertedindex/InvertedIndexLocalTest.java   | 10 +++++-----
 .../endpoint/HbaseServerKVIterator.java         | 21 ++++++++++----------
 .../coprocessor/endpoint/LocalDictionary.java   | 10 +++++-----
 .../invertedindex/IIStreamBuilder.java          |  7 ++++---
 8 files changed, 56 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
index 56e25f0..4ac3f01 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
@@ -18,8 +18,10 @@
 
 package org.apache.kylin.invertedindex.index;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import it.uniroma3.mat.extendedset.intset.ConciseSet;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -56,9 +58,9 @@ public class Slice implements Iterable<RawTableRecord>, Comparable<Slice> {
         }
     }
 
-    public List<Dictionary<?>> getLocalDictionaries() {
+    public Map<Integer, Dictionary<?>> getLocalDictionaries() {
         // TODO localdict
-        return null;
+        return Collections.emptyMap();
     }
 
     public int getRecordCount() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 070b672..81f4d56 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -69,7 +69,7 @@ public class IIKeyValueCodec {
 		ImmutableBytesWritable key = encodeKey(slice.getShard(),
 				slice.getTimestamp(), col, -1);
 		ImmutableBytesWritable value = container.toBytes();
-        return new KeyValuePair(col, key, value);
+        return new KeyValuePair(key, value);
 	}
 
 	private List<KeyValuePair> collectKeyValues(Slice slice,
@@ -80,7 +80,7 @@ public class IIKeyValueCodec {
 		for (int v = 0; v < values.size(); v++) {
 			ImmutableBytesWritable key = encodeKey(slice.getShard(),
 					slice.getTimestamp(), col, v);
-            list.add(new KeyValuePair(col, key, values.get(v)));
+            list.add(new KeyValuePair(key, values.get(v)));
 		}
         return list;
 	}
@@ -114,14 +114,14 @@ public class IIKeyValueCodec {
 	}
 
 	public Iterable<Slice> decodeKeyValue(
-			Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs) {
+			Iterable<KeyValuePair> kvs) {
 		return new Decoder(infoDigest, kvs);
 	}
 
 	private static class Decoder implements Iterable<Slice> {
 
 		TableRecordInfoDigest info;
-		Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator;
+		Iterator<KeyValuePair> iterator;
 
 		Slice next = null;
 		short curShard = Short.MIN_VALUE;
@@ -135,7 +135,7 @@ public class IIKeyValueCodec {
 		List<ImmutableBytesWritable> bitMapValues = Lists.newArrayList();
 
 		Decoder(TableRecordInfoDigest info,
-				Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs) {
+				Iterable<KeyValuePair> kvs) {
 			this.info = info;
 			this.iterator = kvs.iterator();
 		}
@@ -147,10 +147,9 @@ public class IIKeyValueCodec {
 
 			// NOTE the input keys are ordered
 			while (next == null && iterator.hasNext()) {
-				Pair<ImmutableBytesWritable, ImmutableBytesWritable> kv = iterator
-						.next();
-				ImmutableBytesWritable k = kv.getFirst();
-				ImmutableBytesWritable v = kv.getSecond();
+                KeyValuePair kv = iterator.next();
+				ImmutableBytesWritable k = kv.getKey();
+				ImmutableBytesWritable v = kv.getValue();
 				decodeKey(k);
 
 				if (curShard != lastShard

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
index 445389f..7e43b2a 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
@@ -41,18 +41,22 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  */
 public final class KeyValuePair {
 
-    private int columnIndex;
     private ImmutableBytesWritable key;
     private ImmutableBytesWritable value;
+    private ImmutableBytesWritable dictionary;
 
-    public KeyValuePair(int columnIndex, ImmutableBytesWritable key, ImmutableBytesWritable value) {
-        this.columnIndex = columnIndex;
+    public KeyValuePair(ImmutableBytesWritable key, ImmutableBytesWritable value) {
+        this(key, value, null);
+    }
+
+    public KeyValuePair(ImmutableBytesWritable key, ImmutableBytesWritable value, ImmutableBytesWritable dictionary) {
         this.key = key;
         this.value = value;
+        this.dictionary = dictionary;
     }
 
-    public int getColumnIndex() {
-        return columnIndex;
+    public ImmutableBytesWritable getDictionary() {
+        return dictionary;
     }
 
     public ImmutableBytesWritable getKey() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/invertedindex/src/main/java/org/apache/kylin/invertedindex/tools/IICLI.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/tools/IICLI.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/tools/IICLI.java
index 53a5487..eb65ca8 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/tools/IICLI.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/tools/IICLI.java
@@ -18,14 +18,10 @@
 
 package org.apache.kylin.invertedindex.tools;
 
-import java.io.IOException;
-import java.util.Iterator;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Reader;
 import org.apache.kylin.common.KylinConfig;
@@ -37,6 +33,10 @@ import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.KeyValuePair;
+
+import java.io.IOException;
+import java.util.Iterator;
 
 /**
  * @author yangli9
@@ -65,18 +65,17 @@ public class IICLI {
 		System.out.println("Total " + count + " records");
 	}
 
-	public static Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> readSequenceKVs(
+	public static Iterable<KeyValuePair> readSequenceKVs(
 			Configuration hconf, String path) throws IOException {
 		final Reader reader = new Reader(hconf,
 				SequenceFile.Reader.file(new Path(path)));
-		return new Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>() {
+		return new Iterable<KeyValuePair>() {
 			@Override
-			public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() {
-				return new Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>() {
+			public Iterator<KeyValuePair> iterator() {
+				return new Iterator<KeyValuePair>() {
 					ImmutableBytesWritable k = new ImmutableBytesWritable();
 					ImmutableBytesWritable v = new ImmutableBytesWritable();
-					Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(
-							k, v);
+                    KeyValuePair pair = new KeyValuePair(k, v);
 
 					@Override
 					public boolean hasNext() {
@@ -94,7 +93,7 @@ public class IICLI {
 					}
 
 					@Override
-					public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() {
+					public KeyValuePair next() {
 						return pair;
 					}
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
index 4b417e8..f040820 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
@@ -30,6 +30,7 @@ import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.kylin.invertedindex.model.KeyValuePair;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -136,8 +137,7 @@ public class InvertedIndexLocalTest extends LocalFileMetadataTestCase {
 		System.out.println(slices.size() + " slices");
 
 		IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
-		List<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs = encodeKVs(
-				codec, slices);
+		List<KeyValuePair> kvs = encodeKVs(codec, slices);
 		System.out.println(kvs.size() + " KV pairs");
 
 		List<Slice> slicesCopy = decodeKVs(codec, kvs);
@@ -198,10 +198,10 @@ public class InvertedIndexLocalTest extends LocalFileMetadataTestCase {
 		return slices;
 	}
 
-	private List<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> encodeKVs(
+	private List<KeyValuePair> encodeKVs(
 			IIKeyValueCodec codec, List<Slice> slices) {
 
-		List<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs = Lists
+		List<KeyValuePair> kvs = Lists
 				.newArrayList();
 		for (Slice slice : slices) {
 			kvs.addAll(codec.encodeKeyValue(slice));
@@ -210,7 +210,7 @@ public class InvertedIndexLocalTest extends LocalFileMetadataTestCase {
 	}
 
 	private List<Slice> decodeKVs(IIKeyValueCodec codec,
-			List<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> kvs) {
+			List<KeyValuePair> kvs) {
 		List<Slice> slices = Lists.newArrayList();
 		for (Slice slice : codec.decodeKeyValue(kvs)) {
 			slices.add(slice);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
index df26e41..6bcae0d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.kylin.invertedindex.model.KeyValuePair;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -33,28 +34,28 @@ import java.util.List;
 /**
  * Created by honma on 11/10/14.
  */
-public class HbaseServerKVIterator implements Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>, Closeable {
+public class HbaseServerKVIterator implements Iterable<KeyValuePair>, Closeable {
 
-    private RegionScanner innerScaner;
+    private RegionScanner innerScanner;
 
     List<Cell> results = new ArrayList<Cell>();
 
-    public HbaseServerKVIterator(RegionScanner innerScaner) {
-        this.innerScaner = innerScaner;
+    public HbaseServerKVIterator(RegionScanner innerScanner) {
+        this.innerScanner = innerScanner;
     }
 
     @Override
     public void close() throws IOException {
-        IOUtils.closeQuietly(this.innerScaner);
+        IOUtils.closeQuietly(this.innerScanner);
     }
 
     @Override
-    public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() {
-        return new Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>() {
+    public Iterator<KeyValuePair> iterator() {
+        return new Iterator<KeyValuePair>() {
 
             ImmutableBytesWritable key = new ImmutableBytesWritable();
             ImmutableBytesWritable value = new ImmutableBytesWritable();
-            Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<>(key, value);
+            KeyValuePair pair = new KeyValuePair(key, value);
 
             private boolean hasMore = true;
 
@@ -64,10 +65,10 @@ public class HbaseServerKVIterator implements Iterable<Pair<ImmutableBytesWritab
             }
 
             @Override
-            public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() {
+            public KeyValuePair next() {
                 if (hasNext()) {
                     try {
-                        hasMore = innerScaner.nextRaw(results);
+                        hasMore = innerScanner.nextRaw(results);
                     } catch (IOException e) {
                         throw new RuntimeException(e);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/LocalDictionary.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/LocalDictionary.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/LocalDictionary.java
index 28371a3..af6cf38 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/LocalDictionary.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/LocalDictionary.java
@@ -6,7 +6,7 @@ import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;
 
-import java.util.List;
+import java.util.Map;
 
 /**
  * Created by Hongbin Ma(Binmahone) on 3/3/15.
@@ -14,11 +14,11 @@ import java.util.List;
 public class LocalDictionary implements IDictionaryAware {
 
     private CoprocessorRowType type;
-    private List<Dictionary<?>> dicts;
+    private Map<Integer, Dictionary<?>> colDictMap;
     private TableRecordInfoDigest recordInfo;
 
-    public LocalDictionary(List<Dictionary<?>> dicts, CoprocessorRowType type, TableRecordInfoDigest recordInfo) {
-        this.dicts = dicts;
+    public LocalDictionary(Map<Integer, Dictionary<?>> colDictMap, CoprocessorRowType type, TableRecordInfoDigest recordInfo) {
+        this.colDictMap = colDictMap;
         this.type = type;
         this.recordInfo = recordInfo;
     }
@@ -30,6 +30,6 @@ public class LocalDictionary implements IDictionaryAware {
 
     @Override
     public Dictionary<?> getDictionary(TblColRef col) {
-        return this.dicts.get(type.getColIndexByTblColRef(col));
+        return this.colDictMap.get(type.getColIndexByTblColRef(col));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/af6372de/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 6339fe4..36536bb 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -50,6 +50,7 @@ import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.KeyValuePair;
 import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -164,9 +165,9 @@ public class IIStreamBuilder extends StreamBuilder {
     private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
         try {
             List<Put> data = Lists.newArrayList();
-            for (Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair : codec.encodeKeyValue(slice)) {
-                final byte[] key = pair.getFirst().get();
-                final byte[] value = pair.getSecond().get();
+            for (KeyValuePair pair : codec.encodeKeyValue(slice)) {
+                final byte[] key = pair.getKey().get();
+                final byte[] value = pair.getValue().get();
                 Put put = new Put(key);
                 put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
                 //dictionary