You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:05:10 UTC
[33/50] incubator-kylin git commit: KYLIN-653 adding streaming build
test cases
KYLIN-653 adding streaming build test cases
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4df05317
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4df05317
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4df05317
Branch: refs/heads/streaming-localdict
Commit: 4df05317e1a754d1b1e422fdf5df580b2fa3366d
Parents: bbbcae8
Author: honma <ho...@ebay.com>
Authored: Fri Mar 27 09:49:52 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Mar 27 15:16:20 2015 +0800
----------------------------------------------------------------------
.../kylin/invertedindex/index/TableRecord.java | 5 +-
.../invertedindex/index/TableRecordInfo.java | 10 +--
.../kylin/invertedindex/model/IIDesc.java | 1 +
.../model/IIKeyValueCodecWithState.java | 6 +-
.../kylin/job/hadoop/cube/BaseCuboidMapper.java | 20 +++--
.../cube/FactDistinctIIColumnsMapper.java | 9 +-
.../kylin/job/BuildCubeWithEngineTest.java | 1 -
.../invertedindex/IIStreamBuilder.java | 33 ++++---
.../IIKeyValueCodecWithStateTest.java | 91 ++++++++++++++++++++
.../invertedindex/ToyIIStreamBuilder.java | 35 ++++++++
10 files changed, 177 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
index ce1b7e0..78cea3d 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
@@ -18,15 +18,12 @@
package org.apache.kylin.invertedindex.index;
-import com.google.common.collect.Lists;
-import org.apache.kylin.dict.DateStrDictionary;
import org.apache.commons.lang.ObjectUtils;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.dict.DateStrDictionary;
import org.apache.kylin.dict.Dictionary;
-import java.util.List;
-
/**
* @author yangli9, honma
* <p/>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index 3136ebb..9a08e64 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -18,19 +18,17 @@
package org.apache.kylin.invertedindex.index;
-import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.model.IIDesc;
import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
-import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.DataType;
import org.apache.kylin.metadata.model.TblColRef;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
/**
* @author yangli9
* <p/>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index cda3c4d..17edb86 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -319,6 +319,7 @@ public class IIDesc extends RootPersistentEntity {
return sliceSize;
}
+
public String getSignature() {
return signature;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
index a8e149a..e838283 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodecWithState.java
@@ -26,6 +26,7 @@ public class IIKeyValueCodecWithState extends IIKeyValueCodec {
protected static class IIRowDecoderWithState extends IIRowDecoder {
final ArrayList<IIRow> buffer = Lists.newArrayList();
+ private Iterator<Slice> superIterator = null;
private IIRowDecoderWithState(TableRecordInfoDigest digest, Iterator<IIRow> iiRowIterator) {
super(digest, iiRowIterator);
@@ -33,7 +34,10 @@ public class IIKeyValueCodecWithState extends IIKeyValueCodec {
}
private Iterator<Slice> getSuperIterator() {
- return super.iterator();
+ if (superIterator == null) {
+ superIterator = super.iterator();
+ }
+ return superIterator;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
index 41b21a7..a023c0c 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
@@ -210,20 +210,24 @@ public class BaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text
try {
bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
- intermediateTableDesc.sanityCheck(bytesSplitter);
+ outputKV(context);
- byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
- outputKey.set(rowKey, 0, rowKey.length);
-
- buildValue(bytesSplitter.getSplitBuffers());
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
- context.write(outputKey, outputValue);
} catch (Exception ex) {
handleErrorRecord(bytesSplitter, ex);
}
}
+ private void outputKV(Context context) throws IOException, InterruptedException {
+ intermediateTableDesc.sanityCheck(bytesSplitter);
+
+ byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
+ outputKey.set(rowKey, 0, rowKey.length);
+
+ buildValue(bytesSplitter.getSplitBuffers());
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+ context.write(outputKey, outputValue);
+ }
+
private void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException {
System.err.println("Insane record: " + bytesSplitter);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
index 75e127e..705e272 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
@@ -20,6 +20,7 @@ package org.apache.kylin.job.hadoop.cube;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -50,7 +51,7 @@ public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<I
private IIJoinedFlatTableDesc intermediateTableDesc;
private ArrayList<IIRow> buffer = Lists.newArrayList();
- private Iterable<Slice> slices;
+ private Iterator<Slice> slices;
private String iiName;
private IIInstance ii;
@@ -72,7 +73,7 @@ public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<I
intermediateTableDesc = new IIJoinedFlatTableDesc(iiDesc);
TableRecordInfo info = new TableRecordInfo(iiDesc);
KeyValueCodec codec = new IIKeyValueCodecWithState(info.getDigest());
- slices = codec.decodeKeyValue(buffer);
+ slices = codec.decodeKeyValue(buffer).iterator();
baseCuboidCol2FlattenTableCol = new int[factDictCols.size()];
for (int i = 0; i < factDictCols.size(); ++i) {
@@ -98,9 +99,9 @@ public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<I
}
buffer.add(iiRow);
- if (slices.iterator().hasNext()) {
+ if (slices.hasNext()) {
byte[] vBytesBuffer = null;
- Slice slice = slices.iterator().next();
+ Slice slice = slices.next();
for (RawTableRecord record : slice) {
for (int i = 0; i < factDictCols.size(); ++i) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index ce70f2c..a33dab5 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -107,7 +107,6 @@ public class BuildCubeWithEngineTest {
jobService.deleteJob(jobId);
}
}
-
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index f9adefe..0cf3c77 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -82,7 +82,11 @@ public class IIStreamBuilder extends StreamBuilder {
super(queue, desc.getSliceSize());
this.desc = desc;
try {
- this.hTable = HConnectionManager.createConnection(HBaseConfiguration.create()).getTable(hTableName);
+ if (hTableName != null) {
+ this.hTable = HConnectionManager.createConnection(HBaseConfiguration.create()).getTable(hTableName);
+ } else {
+ this.hTable = null;
+ }
} catch (IOException e) {
logger.error("cannot open htable name:" + hTableName, e);
throw new RuntimeException("cannot open htable name:" + hTableName, e);
@@ -105,12 +109,18 @@ public class IIStreamBuilder extends StreamBuilder {
TableRecordInfo tableRecordInfo = new TableRecordInfo(desc, dictionaryMap);
final Slice slice = buildSlice(table, sliceBuilder, tableRecordInfo, dictionaryMap);
logger.info("slice info, shard:" + slice.getShard() + " timestamp:" + slice.getTimestamp() + " record count:" + slice.getRecordCount());
- loadToHBase(hTable, slice, new IIKeyValueCodec(tableRecordInfo.getDigest()));
+
+ outputSlice(slice, tableRecordInfo);
submitOffset();
+
stopwatch.stop();
logger.info("stream build finished, size:" + streamsToBuild.size() + " elapsed time:" + stopwatch.elapsedTime(TimeUnit.MILLISECONDS) + TimeUnit.MILLISECONDS);
}
+ protected void outputSlice(Slice slice, TableRecordInfo tableRecordInfo) throws IOException {
+ loadToHBase(hTable, slice, new IIKeyValueCodec(tableRecordInfo.getDigest()));
+ }
+
private Map<Integer, Dictionary<?>> buildDictionary(List<List<String>> table, IIDesc desc) {
HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
final List<TblColRef> allColumns = desc.listAllColumns();
@@ -122,15 +132,19 @@ public class IIStreamBuilder extends StreamBuilder {
}
}
}
+
Map<Integer, Dictionary<?>> result = Maps.newHashMap();
for (TblColRef tblColRef : valueMap.keySet()) {
- result.put(desc.findColumn(tblColRef), DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
- @Nullable
- @Override
- public byte[] apply(String input) {
- return input.getBytes();
- }
- })));
+ result.put(desc.findColumn(tblColRef), //
+ DictionaryGenerator.buildDictionaryFromValueList(//
+ tblColRef.getType(), //
+ Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
+ @Nullable
+ @Override
+ public byte[] apply(String input) {
+ return input.getBytes();
+ }
+ })));
}
return result;
}
@@ -178,7 +192,6 @@ public class IIStreamBuilder extends StreamBuilder {
}
}
-
private void submitOffset() {
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
new file mode 100644
index 0000000..25e250c
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
@@ -0,0 +1,91 @@
+package org.apache.kylin.streaming.invertedindex;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.Nullable;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.invertedindex.model.KeyValueCodec;
+import org.apache.kylin.streaming.Stream;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 3/26/15.
+ */
+public class IIKeyValueCodecWithStateTest extends LocalFileMetadataTestCase {
+
+ IIInstance ii;
+ IIDesc iiDesc;
+ List<IIRow> iiRowList = Lists.newArrayList();
+
+ final String[] inputs = new String[] { //
+ "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
+ "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,2012-08-16,43479,10000807,26.2474,0", //
+ "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0" };
+
+ @Before
+ public void setUp() throws Exception {
+ this.createTestMetadata();
+ this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_inner_join");
+ this.iiDesc = ii.getDescriptor();
+
+ Collection<?> streams = Collections2.transform(Arrays.asList(inputs), new Function<String, Stream>() {
+ @Nullable
+ @Override
+ public Stream apply(String input) {
+ return new Stream(0, input.getBytes());
+ }
+ });
+ LinkedBlockingQueue q = new LinkedBlockingQueue();
+ q.addAll(streams);
+ q.put(new Stream(-1, null));//a stop sign for builder
+
+ ToyIIStreamBuilder builder = new ToyIIStreamBuilder(q, iiDesc, 0, iiRowList);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ Future<?> future = executorService.submit(builder);
+ future.get();
+ }
+
+ @Test
+ public void basicTest() {
+ ArrayList<IIRow> buffer = Lists.newArrayList();
+ TableRecordInfo info = new TableRecordInfo(iiDesc);
+ TableRecordInfoDigest digest = info.getDigest();
+ int columnCount = digest.getColumnCount();
+ KeyValueCodec codec = new IIKeyValueCodecWithState(digest);
+ Iterator<Slice> slices = codec.decodeKeyValue(buffer).iterator();
+
+ Assert.assertTrue(!slices.hasNext());
+ Assert.assertEquals(iiRowList.size(), digest.getColumnCount());
+
+ for (int i = 0; i < digest.getColumnCount(); ++i) {
+ buffer.add(iiRowList.get(i));
+
+ if (i != digest.getColumnCount() - 1) {
+ Assert.assertTrue(!slices.hasNext());
+ } else {
+ Assert.assertTrue(slices.hasNext());
+ }
+ }
+
+ Slice newSlice = slices.next();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4df05317/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java
new file mode 100644
index 0000000..161b6f6
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java
@@ -0,0 +1,35 @@
+package org.apache.kylin.streaming.invertedindex;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.streaming.Stream;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 3/26/15.
+ *
+ * A IIStreamBuilder that can hold all the built slices in form of IIRow
+ * This is only for test use
+ */
+public class ToyIIStreamBuilder extends IIStreamBuilder {
+ private List<IIRow> result;
+
+ public ToyIIStreamBuilder(BlockingQueue<Stream> queue, IIDesc desc, int partitionId, List<IIRow> result) {
+ super(queue, null, desc, partitionId);
+ this.result = result;
+ }
+
+ protected void outputSlice(Slice slice, TableRecordInfo tableRecordInfo) throws IOException {
+ IIKeyValueCodec codec = new IIKeyValueCodec(tableRecordInfo.getDigest());
+ for (IIRow iiRow : codec.encodeKeyValue(slice)) {
+ result.add(iiRow);
+ }
+ }
+
+}