You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:05:10 UTC

[33/50] incubator-kylin git commit: KYLIN-653 adding streaming build test cases

KYLIN-653 adding streaming build test cases


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4df05317
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4df05317
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4df05317

Branch: refs/heads/streaming-localdict
Commit: 4df05317e1a754d1b1e422fdf5df580b2fa3366d
Parents: bbbcae8
Author: honma <ho...@ebay.com>
Authored: Fri Mar 27 09:49:52 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Mar 27 15:16:20 2015 +0800

----------------------------------------------------------------------
 .../kylin/invertedindex/index/TableRecord.java  |  5 +-
 .../invertedindex/index/TableRecordInfo.java    | 10 +--
 .../kylin/invertedindex/model/IIDesc.java       |  1 +
 .../model/IIKeyValueCodecWithState.java         |  6 +-
 .../kylin/job/hadoop/cube/BaseCuboidMapper.java | 20 +++--
 .../cube/FactDistinctIIColumnsMapper.java       |  9 +-
 .../kylin/job/BuildCubeWithEngineTest.java      |  1 -
 .../invertedindex/IIStreamBuilder.java          | 33 ++++---
 .../IIKeyValueCodecWithStateTest.java           | 91 ++++++++++++++++++++
 .../invertedindex/ToyIIStreamBuilder.java       | 35 ++++++++
 10 files changed, 177 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
index ce1b7e0..78cea3d 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
@@ -18,15 +18,12 @@
 
 package org.apache.kylin.invertedindex.index;
 
-import com.google.common.collect.Lists;
-import org.apache.kylin.dict.DateStrDictionary;
 import org.apache.commons.lang.ObjectUtils;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.dict.DateStrDictionary;
 import org.apache.kylin.dict.Dictionary;
 
-import java.util.List;
-
 /**
  * @author yangli9, honma
  *         <p/>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index 3136ebb..9a08e64 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -18,19 +18,17 @@
 
 package org.apache.kylin.invertedindex.index;
 
-import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
-import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 /**
  * @author yangli9
  *         <p/>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index cda3c4d..17edb86 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -319,6 +319,7 @@ public class IIDesc extends RootPersistentEntity {
         return sliceSize;
     }
 
+
     public String getSignature() {
         return signature;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
index a8e149a..e838283 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
@@ -26,6 +26,7 @@ public class IIKeyValueCodecWithState extends IIKeyValueCodec {
     protected static class IIRowDecoderWithState extends IIRowDecoder {
 
         final ArrayList<IIRow> buffer = Lists.newArrayList();
+        private Iterator<Slice> superIterator = null;
 
         private IIRowDecoderWithState(TableRecordInfoDigest digest, Iterator<IIRow> iiRowIterator) {
             super(digest, iiRowIterator);
@@ -33,7 +34,10 @@ public class IIKeyValueCodecWithState extends IIKeyValueCodec {
         }
 
         private Iterator<Slice> getSuperIterator() {
-            return super.iterator();
+            if (superIterator == null) {
+                superIterator = super.iterator();
+            }
+            return superIterator;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
index 41b21a7..a023c0c 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
@@ -210,20 +210,24 @@ public class BaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text
 
         try {
             bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
-            intermediateTableDesc.sanityCheck(bytesSplitter);
+            outputKV(context);
 
-            byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
-            outputKey.set(rowKey, 0, rowKey.length);
-
-            buildValue(bytesSplitter.getSplitBuffers());
-            outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
-            context.write(outputKey, outputValue);
         } catch (Exception ex) {
             handleErrorRecord(bytesSplitter, ex);
         }
     }
 
+    private void outputKV(Context context) throws IOException, InterruptedException {
+        intermediateTableDesc.sanityCheck(bytesSplitter);
+
+        byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
+        outputKey.set(rowKey, 0, rowKey.length);
+
+        buildValue(bytesSplitter.getSplitBuffers());
+        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+        context.write(outputKey, outputValue);
+    }
+
     private void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException {
 
         System.err.println("Insane record: " + bytesSplitter);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
index 75e127e..705e272 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
@@ -20,6 +20,7 @@ package org.apache.kylin.job.hadoop.cube;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -50,7 +51,7 @@ public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<I
 
     private IIJoinedFlatTableDesc intermediateTableDesc;
     private ArrayList<IIRow> buffer = Lists.newArrayList();
-    private Iterable<Slice> slices;
+    private Iterator<Slice> slices;
 
     private String iiName;
     private IIInstance ii;
@@ -72,7 +73,7 @@ public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<I
         intermediateTableDesc = new IIJoinedFlatTableDesc(iiDesc);
         TableRecordInfo info = new TableRecordInfo(iiDesc);
         KeyValueCodec codec = new IIKeyValueCodecWithState(info.getDigest());
-        slices = codec.decodeKeyValue(buffer);
+        slices = codec.decodeKeyValue(buffer).iterator();
 
         baseCuboidCol2FlattenTableCol = new int[factDictCols.size()];
         for (int i = 0; i < factDictCols.size(); ++i) {
@@ -98,9 +99,9 @@ public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<I
         }
         buffer.add(iiRow);
 
-        if (slices.iterator().hasNext()) {
+        if (slices.hasNext()) {
             byte[] vBytesBuffer = null;
-            Slice slice = slices.iterator().next();
+            Slice slice = slices.next();
 
             for (RawTableRecord record : slice) {
                 for (int i = 0; i < factDictCols.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index ce70f2c..a33dab5 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -107,7 +107,6 @@ public class BuildCubeWithEngineTest {
                 jobService.deleteJob(jobId);
             }
         }
-
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index f9adefe..0cf3c77 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -82,7 +82,11 @@ public class IIStreamBuilder extends StreamBuilder {
         super(queue, desc.getSliceSize());
         this.desc = desc;
         try {
-            this.hTable = HConnectionManager.createConnection(HBaseConfiguration.create()).getTable(hTableName);
+            if (hTableName != null) {
+                this.hTable = HConnectionManager.createConnection(HBaseConfiguration.create()).getTable(hTableName);
+            } else {
+                this.hTable = null;
+            }
         } catch (IOException e) {
             logger.error("cannot open htable name:" + hTableName, e);
             throw new RuntimeException("cannot open htable name:" + hTableName, e);
@@ -105,12 +109,18 @@ public class IIStreamBuilder extends StreamBuilder {
         TableRecordInfo tableRecordInfo = new TableRecordInfo(desc, dictionaryMap);
         final Slice slice = buildSlice(table, sliceBuilder, tableRecordInfo, dictionaryMap);
         logger.info("slice info, shard:" + slice.getShard() + " timestamp:" + slice.getTimestamp() + " record count:" + slice.getRecordCount());
-        loadToHBase(hTable, slice, new IIKeyValueCodec(tableRecordInfo.getDigest()));
+
+        outputSlice(slice, tableRecordInfo);
         submitOffset();
+
         stopwatch.stop();
         logger.info("stream build finished, size:" + streamsToBuild.size() + " elapsed time:" + stopwatch.elapsedTime(TimeUnit.MILLISECONDS) + TimeUnit.MILLISECONDS);
     }
 
+    protected void outputSlice(Slice slice, TableRecordInfo tableRecordInfo) throws IOException {
+        loadToHBase(hTable, slice, new IIKeyValueCodec(tableRecordInfo.getDigest()));
+    }
+
     private Map<Integer, Dictionary<?>> buildDictionary(List<List<String>> table, IIDesc desc) {
         HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
         final List<TblColRef> allColumns = desc.listAllColumns();
@@ -122,15 +132,19 @@ public class IIStreamBuilder extends StreamBuilder {
                 }
             }
         }
+
         Map<Integer, Dictionary<?>> result = Maps.newHashMap();
         for (TblColRef tblColRef : valueMap.keySet()) {
-            result.put(desc.findColumn(tblColRef), DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
-                @Nullable
-                @Override
-                public byte[] apply(String input) {
-                    return input.getBytes();
-                }
-            })));
+            result.put(desc.findColumn(tblColRef), //
+                    DictionaryGenerator.buildDictionaryFromValueList(//
+                            tblColRef.getType(), //
+                            Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
+                                @Nullable
+                                @Override
+                                public byte[] apply(String input) {
+                                    return input.getBytes();
+                                }
+                            })));
         }
         return result;
     }
@@ -178,7 +192,6 @@ public class IIStreamBuilder extends StreamBuilder {
         }
     }
 
-
     private void submitOffset() {
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
new file mode 100644
index 0000000..25e250c
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
@@ -0,0 +1,91 @@
+package org.apache.kylin.streaming.invertedindex;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.Nullable;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.invertedindex.model.KeyValueCodec;
+import org.apache.kylin.streaming.Stream;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 3/26/15.
+ */
+public class IIKeyValueCodecWithStateTest extends LocalFileMetadataTestCase {
+
+    IIInstance ii;
+    IIDesc iiDesc;
+    List<IIRow> iiRowList = Lists.newArrayList();
+
+    final String[] inputs = new String[] { //
+    "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
+            "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,2012-08-16,43479,10000807,26.2474,0", //
+            "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0" };
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+        this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_inner_join");
+        this.iiDesc = ii.getDescriptor();
+
+        Collection<?> streams = Collections2.transform(Arrays.asList(inputs), new Function<String, Stream>() {
+            @Nullable
+            @Override
+            public Stream apply(String input) {
+                return new Stream(0, input.getBytes());
+            }
+        });
+        LinkedBlockingQueue q = new LinkedBlockingQueue();
+        q.addAll(streams);
+        q.put(new Stream(-1, null));//a stop sign for builder
+
+        ToyIIStreamBuilder builder = new ToyIIStreamBuilder(q, iiDesc, 0, iiRowList);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        Future<?> future = executorService.submit(builder);
+        future.get();
+    }
+
+    @Test
+    public void basicTest() {
+        ArrayList<IIRow> buffer = Lists.newArrayList();
+        TableRecordInfo info = new TableRecordInfo(iiDesc);
+        TableRecordInfoDigest digest = info.getDigest();
+        int columnCount = digest.getColumnCount();
+        KeyValueCodec codec = new IIKeyValueCodecWithState(digest);
+        Iterator<Slice> slices = codec.decodeKeyValue(buffer).iterator();
+
+        Assert.assertTrue(!slices.hasNext());
+        Assert.assertEquals(iiRowList.size(), digest.getColumnCount());
+
+        for (int i = 0; i < digest.getColumnCount(); ++i) {
+            buffer.add(iiRowList.get(i));
+
+            if (i != digest.getColumnCount() - 1) {
+                Assert.assertTrue(!slices.hasNext());
+            } else {
+                Assert.assertTrue(slices.hasNext());
+            }
+        }
+
+        Slice newSlice = slices.next();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java
new file mode 100644
index 0000000..161b6f6
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java
@@ -0,0 +1,35 @@
+package org.apache.kylin.streaming.invertedindex;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.streaming.Stream;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 3/26/15.
+ *
+ * A IIStreamBuilder that can hold all the built slices in form of IIRow
+ * This is only for test use
+ */
+public class ToyIIStreamBuilder extends IIStreamBuilder {
+    private List<IIRow> result;
+
+    public ToyIIStreamBuilder(BlockingQueue<Stream> queue, IIDesc desc, int partitionId, List<IIRow> result) {
+        super(queue, null, desc, partitionId);
+        this.result = result;
+    }
+
+    protected void outputSlice(Slice slice, TableRecordInfo tableRecordInfo) throws IOException {
+        IIKeyValueCodec codec = new IIKeyValueCodec(tableRecordInfo.getDigest());
+        for (IIRow iiRow : codec.encodeKeyValue(slice)) {
+            result.add(iiRow);
+        }
+    }
+
+}