You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/14 01:00:56 UTC

[21/50] [abbrv] incubator-kylin git commit: stream building

stream building


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/66902d01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/66902d01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/66902d01

Branch: refs/heads/streaming-localdict
Commit: 66902d017785ef5395405850fbd858ca612b722d
Parents: 2be4e32
Author: qianhao.zhou <qi...@ebay.com>
Authored: Wed Mar 11 15:40:59 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Mar 11 15:40:59 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/invertedindex/index/Slice.java |  8 +++--
 .../invertedindex/index/TableRecordInfo.java    | 17 ---------
 .../invertedindex/model/IIKeyValueCodec.java    | 15 ++++----
 .../kylin/invertedindex/model/KeyValuePair.java | 23 +++++++++----
 .../endpoint/HbaseServerKVIterator.java         | 36 +++++++++++++++++---
 .../hbase/coprocessor/endpoint/IIEndpoint.java  |  3 +-
 .../apache/kylin/streaming/StreamBuilder.java   | 28 ++++++++-------
 .../invertedindex/IIStreamBuilder.java          | 31 ++++++++++-------
 8 files changed, 100 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
index 4ac3f01..39a04cf 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/Slice.java
@@ -42,6 +42,7 @@ public class Slice implements Iterable<RawTableRecord>, Comparable<Slice> {
     long timestamp;
     int nRecords;
     ColumnValueContainer[] containers;
+    private Map<Integer, Dictionary<?>> localDictionaries;
 
     public Slice(TableRecordInfoDigest digest, short shard, long timestamp, ColumnValueContainer[] containers) {
         this.info = digest;
@@ -59,8 +60,11 @@ public class Slice implements Iterable<RawTableRecord>, Comparable<Slice> {
     }
 
     public Map<Integer, Dictionary<?>> getLocalDictionaries() {
-        // TODO localdict
-        return Collections.emptyMap();
+        return localDictionaries;
+    }
+
+    public void setLocalDictionaries(Map<Integer, Dictionary<?>> localDictionaries) {
+        this.localDictionaries = localDictionaries;
     }
 
     public int getRecordCount() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index 68feb03..5535fcc 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -77,23 +77,6 @@ public class TableRecordInfo {
         this.digest = createDigest(dictionaryMap, measureCodecMap);
     }
 
-    private List<Integer> getLocalDictColumnList() {
-        //TODO localdict
-        return Collections.emptyList();
-    }
-
-//    public void updateDictionary(List<Dictionary<?>> dicts) {
-//        List<Integer> columns = getLocalDictColumnList();
-//
-//        if (columns.size() != dicts.size()) {
-//            throw new RuntimeException("columns size not equal to dicts size");
-//        }
-//
-//        for (Integer index : columns) {
-//            this.dictionaries[index] = dicts.get(index);
-//        }
-//    }
-
     public TableRecordInfoDigest getDigest() {
         return digest;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 81f4d56..8ea4e70 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -18,17 +18,16 @@
 
 package org.apache.kylin.invertedindex.model;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 
+import com.google.common.collect.Maps;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.common.collect.Lists;
 
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.invertedindex.index.*;
 
 /**
@@ -69,7 +68,7 @@ public class IIKeyValueCodec {
 		ImmutableBytesWritable key = encodeKey(slice.getShard(),
 				slice.getTimestamp(), col, -1);
 		ImmutableBytesWritable value = container.toBytes();
-        return new KeyValuePair(key, value);
+        return new KeyValuePair(col, key, value);
 	}
 
 	private List<KeyValuePair> collectKeyValues(Slice slice,
@@ -80,7 +79,7 @@ public class IIKeyValueCodec {
 		for (int v = 0; v < values.size(); v++) {
 			ImmutableBytesWritable key = encodeKey(slice.getShard(),
 					slice.getTimestamp(), col, v);
-            list.add(new KeyValuePair(key, values.get(v)));
+            list.add(new KeyValuePair(col, key, values.get(v)));
 		}
         return list;
 	}
@@ -133,6 +132,7 @@ public class IIKeyValueCodec {
 		int lastCol = -1;
 		ColumnValueContainer[] containers = null;
 		List<ImmutableBytesWritable> bitMapValues = Lists.newArrayList();
+        Map<Integer, Dictionary<?>> localDictionaries = Maps.newHashMap();
 
 		Decoder(TableRecordInfoDigest info,
 				Iterable<KeyValuePair> kvs) {
@@ -151,6 +151,7 @@ public class IIKeyValueCodec {
 				ImmutableBytesWritable k = kv.getKey();
 				ImmutableBytesWritable v = kv.getValue();
 				decodeKey(k);
+                localDictionaries.put(curCol, kv.getDictionary());
 
 				if (curShard != lastShard
 						|| curSliceTimestamp != lastSliceTimestamp) {
@@ -216,11 +217,13 @@ public class IIKeyValueCodec {
 			if (containers != null) {
 				next = new Slice(info, lastShard, lastSliceTimestamp,
 						containers);
+                next.setLocalDictionaries(Maps.newHashMap(localDictionaries));
 			}
 			lastSliceTimestamp = Long.MIN_VALUE;
 			lastCol = -1;
 			containers = null;
 			bitMapValues.clear();
+            localDictionaries.clear();
 		}
 
 		private void addBitMapContainer(int col) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
index 7e43b2a..48c45f6 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/KeyValuePair.java
@@ -35,27 +35,36 @@
 package org.apache.kylin.invertedindex.model;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.dict.Dictionary;
 
 /**
  * Created by qianzhou on 3/10/15.
  */
 public final class KeyValuePair {
 
-    private ImmutableBytesWritable key;
-    private ImmutableBytesWritable value;
-    private ImmutableBytesWritable dictionary;
+    private final ImmutableBytesWritable key;
+    private final ImmutableBytesWritable value;
+    private final int id;
+    private Dictionary<?> dictionary;
 
     public KeyValuePair(ImmutableBytesWritable key, ImmutableBytesWritable value) {
-        this(key, value, null);
+        this(-1, key, value);
     }
-
-    public KeyValuePair(ImmutableBytesWritable key, ImmutableBytesWritable value, ImmutableBytesWritable dictionary) {
+    public KeyValuePair(int id, ImmutableBytesWritable key, ImmutableBytesWritable value) {
+        this.id = id;
         this.key = key;
         this.value = value;
+    }
+
+    public void setDictionary(Dictionary<?> dictionary) {
         this.dictionary = dictionary;
     }
 
-    public ImmutableBytesWritable getDictionary() {
+    public int getId() {
+        return this.id;
+    }
+
+    public Dictionary<?> getDictionary() {
         return dictionary;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
index 6bcae0d..47aa7d9 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
@@ -22,10 +22,17 @@ import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.KeyValuePair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
 import java.io.Closeable;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -37,6 +44,7 @@ import java.util.List;
 public class HbaseServerKVIterator implements Iterable<KeyValuePair>, Closeable {
 
     private RegionScanner innerScanner;
+    private Logger logger = LoggerFactory.getLogger(HbaseServerKVIterator.class);
 
     List<Cell> results = new ArrayList<Cell>();
 
@@ -55,6 +63,7 @@ public class HbaseServerKVIterator implements Iterable<KeyValuePair>, Closeable
 
             ImmutableBytesWritable key = new ImmutableBytesWritable();
             ImmutableBytesWritable value = new ImmutableBytesWritable();
+            ImmutableBytesWritable dict = new ImmutableBytesWritable();
             KeyValuePair pair = new KeyValuePair(key, value);
 
             private boolean hasMore = true;
@@ -76,9 +85,28 @@ public class HbaseServerKVIterator implements Iterable<KeyValuePair>, Closeable
                     if (results.size() < 1)
                         throw new IllegalStateException("Hbase row contains less than 1 cell");
 
-                    Cell c = results.get(0);
-                    key.set(c.getRowArray(), c.getRowOffset(), c.getRowLength());
-                    value.set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+                    Dictionary<?> dictionary = null;
+                    boolean hasDictData = false;
+                    for (Cell c : results) {
+                        if (BytesUtil.compareBytes(IIDesc.HBASE_QUALIFIER_BYTES, 0, c.getQualifierArray(), c.getQualifierOffset(), IIDesc.HBASE_QUALIFIER_BYTES.length) == 0) {
+                            key.set(c.getRowArray(), c.getRowOffset(), c.getRowLength());
+                            value.set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+                        } else if (BytesUtil.compareBytes(IIDesc.HBASE_DICTIONARY_BYTES, 0, c.getQualifierArray(), c.getQualifierOffset(), IIDesc.HBASE_DICTIONARY_BYTES.length) == 0) {
+                            dict.set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+                            hasDictData = true;
+                        }
+                    }
+                    if (hasDictData) {
+                        try {
+                            final DataInputStream in = new DataInputStream(new ByteArrayInputStream(dict.get(), dict.getOffset(), dict.getLength()));
+                            String type = in.readUTF();
+                            dictionary = (Dictionary<?>) ClassUtil.forName(type, Dictionary.class).newInstance();
+                            dictionary.readFields(in);
+                        } catch (Exception e) {
+                            logger.error("error create dictionary", e);
+                        }
+                    }
+                    pair.setDictionary(dictionary);
 
                     results.clear();
                     return pair;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
index b72a376..fbe3e8b 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
@@ -63,7 +63,8 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
 
     private Scan buildScan() {
         Scan scan = new Scan();
-        scan.addColumn(Bytes.toBytes(IIDesc.HBASE_FAMILY), Bytes.toBytes(IIDesc.HBASE_QUALIFIER));
+        scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
+        scan.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES);
 
         return scan;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index a86a9b1..39c3966 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -60,7 +60,7 @@ public abstract class StreamBuilder implements Runnable {
         this.sliceSize = sliceSize;
     }
 
-    protected abstract boolean build(List<Stream> streamsToBuild);
+    protected abstract void build(List<Stream> streamsToBuild) throws Exception;
 
     private void clearCounter() {
         lastBuildTime = System.currentTimeMillis();
@@ -72,7 +72,13 @@ public abstract class StreamBuilder implements Runnable {
             List<Stream> streamToBuild = Lists.newArrayList();
             clearCounter();
             while (true) {
-                final Stream stream = streamQueue.poll(50, TimeUnit.MILLISECONDS);
+                Stream stream;
+                try {
+                    stream = streamQueue.poll(50, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    logger.warn("stream queue interrupted", e);
+                    continue;
+                }
                 if (stream == null) {
                     continue;
                 } else {
@@ -86,21 +92,19 @@ public abstract class StreamBuilder implements Runnable {
                 }
                 streamToBuild.add(stream);
                 if (streamToBuild.size() >= this.sliceSize) {
-                    if (build(streamToBuild)) {
-                        clearCounter();
-                        streamToBuild.clear();
-                    }
+                    build(streamToBuild);
+                    clearCounter();
+                    streamToBuild.clear();
                 } else if ((System.currentTimeMillis() - lastBuildTime) > BATCH_BUILD_INTERVAL_THRESHOLD) {
-                    if (build(streamToBuild)) {
-                        clearCounter();
-                        streamToBuild.clear();
-                    }
+                    build(streamToBuild);
+                    clearCounter();
+                    streamToBuild.clear();
                 } else {
                     continue;
                 }
             }
-        } catch (InterruptedException e) {
-            logger.error("StreamBuilder has been interrupted", e);
+        }  catch (Exception e) {
+            logger.error("build stream error, stop building", e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/66902d01/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 36536bb..a4c7167 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -60,6 +60,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -90,7 +92,7 @@ public class IIStreamBuilder extends StreamBuilder {
     }
 
     @Override
-    protected boolean build(List<Stream> streamsToBuild) {
+    protected void build(List<Stream> streamsToBuild) throws IOException {
         List<List<String>> table = Lists.transform(streamsToBuild, new Function<Stream, List<String>>() {
             @Nullable
             @Override
@@ -110,14 +112,8 @@ public class IIStreamBuilder extends StreamBuilder {
         TableRecordInfo tableRecordInfo = new TableRecordInfo(desc, dictionaryMap, measureCodecMap);
         SliceBuilder sliceBuilder = new SliceBuilder(tableRecordInfo, (short) partitionId);
         final Slice slice = buildSlice(table, sliceBuilder, tableRecordInfo);
-//        try {
-//            loadToHBase(hTable, slice, new IIKeyValueCodec(tableRecordInfo.getDigest()));
-//            submitOffset();
-//        } catch (IOException e) {
-//            logger.error("error load to hbase, build failed", e);
-//            return false;
-//        }
-        return true;
+        loadToHBase(hTable, slice, new IIKeyValueCodec(tableRecordInfo.getDigest()), desc.listAllColumns(), dictionaryMap);
+        submitOffset();
     }
 
     private Map<TblColRef, Dictionary<?>> buildDictionary(List<List<String>> table, IIDesc desc) {
@@ -162,7 +158,7 @@ public class IIStreamBuilder extends StreamBuilder {
         return sliceBuilder.close();
     }
 
-    private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException {
+    private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec, List<TblColRef> allColumns, Map<TblColRef, Dictionary<?>> localDictionaries) throws IOException {
         try {
             List<Put> data = Lists.newArrayList();
             for (KeyValuePair pair : codec.encodeKeyValue(slice)) {
@@ -170,9 +166,20 @@ public class IIStreamBuilder extends StreamBuilder {
                 final byte[] value = pair.getValue().get();
                 Put put = new Put(key);
                 put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
+
                 //dictionary
-//                put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value);
-                data.add(put);
+                final int id = pair.getId();
+                if (id >= 0 && id < allColumns.size()) {
+                    final Dictionary<?> dictionary = localDictionaries.get(allColumns.get(id));
+                    if (dictionary != null) {
+                        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                        DataOutputStream out = new DataOutputStream(baos);
+                        out.writeUTF(dictionary.getClass().getName());
+                        dictionary.write(out);
+                        put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, baos.toByteArray());
+                    }
+                    data.add(put);
+                }
             }
             hTable.put(data);
         } finally {