You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:05:09 UTC
[32/50] incubator-kylin git commit: KYLIN-653 fact distinct mapper
for II test passed
KYLIN-653 fact distinct mapper for II test passed
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0f8b7a46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0f8b7a46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0f8b7a46
Branch: refs/heads/streaming-localdict
Commit: 0f8b7a4689cde3e4844132efba8665cf0362bf60
Parents: fc5ab52
Author: honma <ho...@ebay.com>
Authored: Fri Mar 27 14:52:46 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Mar 27 15:16:20 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/util/BasicTest.java | 2 -
.../test_kylin_cube_with_slr_desc.json | 2 +-
.../apache/kylin/invertedindex/model/IIRow.java | 10 ++
.../cube/FactDistinctIIColumnsMapper.java | 15 +-
.../job/hadoop/invertedindex/II2CubeTest.java | 146 +++++++++++++++++++
.../invertedindex/ToyIIStreamBuilder.java | 36 +++++
streaming/pom.xml | 7 +
.../kylin/streaming/cube/CubeStreamBuilder.java | 20 +--
.../IIKeyValueCodecWithStateTest.java | 103 -------------
.../invertedindex/ToyIIStreamBuilder.java | 35 -----
10 files changed, 211 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 0b92bf9..068ebbf 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -23,9 +23,7 @@ import java.nio.ByteBuffer;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
-import java.util.concurrent.*;
-import com.google.common.collect.Lists;
import org.apache.commons.configuration.ConfigurationException;
import org.junit.Ignore;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
index c4d55f4..5a1049c 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
@@ -135,7 +135,7 @@
}, {
"column" : "lstg_format_name",
"length" : 12,
- "dictionary" : null,
+ "dictionary" : "true",
"mandatory" : false
}, {
"column" : "lstg_site_id",
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
index f3d398a..273d1e6 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
@@ -34,10 +34,14 @@
package org.apache.kylin.invertedindex.model;
+import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.kylin.common.util.BytesUtil;
+import java.util.List;
+
/**
* Created by qianzhou on 3/10/15.
*/
@@ -77,4 +81,10 @@ public final class IIRow {
this.getDictionary().set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
}
}
+
+ public List<Cell> makeCells() {
+ Cell a = new KeyValue(this.getKey().copyBytes(), IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, this.getValue().copyBytes());
+ Cell b = new KeyValue(this.getKey().copyBytes(), IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, this.getDictionary().copyBytes());
+ return Lists.newArrayList(a, b);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
index 6a236fd..75709f6 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
@@ -51,14 +51,9 @@ import com.google.common.collect.Lists;
*/
public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<ImmutableBytesWritable, Result> {
- private IIJoinedFlatTableDesc intermediateTableDesc;
private Queue<IIRow> buffer = Lists.newLinkedList();
private Iterator<Slice> slices;
- private String iiName;
- private IIInstance ii;
- private IIDesc iiDesc;
-
private int[] baseCuboidCol2FlattenTableCol;
@Override
@@ -68,11 +63,11 @@ public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<I
Configuration conf = context.getConfiguration();
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
- iiName = conf.get(BatchConstants.CFG_II_NAME);
- ii = IIManager.getInstance(config).getII(iiName);
- iiDesc = ii.getDescriptor();
+ String iiName = conf.get(BatchConstants.CFG_II_NAME);
+ IIInstance ii = IIManager.getInstance(config).getII(iiName);
+ IIDesc iiDesc = ii.getDescriptor();
- intermediateTableDesc = new IIJoinedFlatTableDesc(iiDesc);
+ IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(iiDesc);
TableRecordInfo info = new TableRecordInfo(iiDesc);
KeyValueCodec codec = new IIKeyValueCodecWithState(info.getDigest());
slices = codec.decodeKeyValue(new FIFOIterable<IIRow>(buffer)).iterator();
@@ -116,7 +111,7 @@ public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<I
vBytesBuffer = new byte[dictionary.getSizeOfValue() * 2];
}
- int vid = record.getValueID(baseCuboidIndex);
+ int vid = record.getValueID(indexInRecord);
if (vid == dictionary.nullId()) {
continue;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/II2CubeTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/II2CubeTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/II2CubeTest.java
new file mode 100644
index 0000000..6832dcf
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/II2CubeTest.java
@@ -0,0 +1,146 @@
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
+import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.kylin.common.util.FIFOIterable;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.invertedindex.model.KeyValueCodec;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.cube.FactDistinctIIColumnsMapper;
+import org.apache.kylin.streaming.Stream;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 3/26/15.
+ */
+public class II2CubeTest extends LocalFileMetadataTestCase {
+
+ String iiName = "test_kylin_ii_inner_join";
+ IIInstance ii;
+ IIDesc iiDesc;
+ String cubeName = "test_kylin_cube_with_slr_empty";
+
+ List<IIRow> iiRows;
+
+ final String[] inputs = new String[] { //
+ "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
+ "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,0,2012-08-16,43479,10000807,26.2474,0", //
+ "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0" };
+
+ @Before
+ public void setUp() throws Exception {
+ this.createTestMetadata();
+ this.ii = IIManager.getInstance(getTestConfig()).getII(iiName);
+ this.iiDesc = ii.getDescriptor();
+
+ Collection<?> streams = Collections2.transform(Arrays.asList(inputs), new Function<String, Stream>() {
+ @Nullable
+ @Override
+ public Stream apply(String input) {
+ return new Stream(0, input.getBytes());
+ }
+ });
+ LinkedBlockingQueue q = new LinkedBlockingQueue();
+ q.addAll(streams);
+ q.put(new Stream(-1, null));//a stop sign for builder
+
+ iiRows = Lists.newArrayList();
+ ToyIIStreamBuilder builder = new ToyIIStreamBuilder(q, iiDesc, 0, iiRows);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ Future<?> future = executorService.submit(builder);
+ future.get();
+
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ }
+
+ /**
+ * simulate stream building into slices, and encode the slice into IIRows.
+ * Then reconstruct the IIRows to slice.
+ */
+ @Test
+ public void basicTest() {
+ Queue<IIRow> buffer = Lists.newLinkedList();
+ FIFOIterable bufferIterable = new FIFOIterable(buffer);
+ TableRecordInfo info = new TableRecordInfo(iiDesc);
+ TableRecordInfoDigest digest = info.getDigest();
+ KeyValueCodec codec = new IIKeyValueCodecWithState(digest);
+ Iterator<Slice> slices = codec.decodeKeyValue(bufferIterable).iterator();
+
+ Assert.assertTrue(!slices.hasNext());
+ Assert.assertEquals(iiRows.size(), digest.getColumnCount());
+
+ for (int i = 0; i < digest.getColumnCount(); ++i) {
+ buffer.add(iiRows.get(i));
+
+ if (i != digest.getColumnCount() - 1) {
+ Assert.assertTrue(!slices.hasNext());
+ } else {
+ Assert.assertTrue(slices.hasNext());
+ }
+ }
+
+ Slice newSlice = slices.next();
+ Assert.assertEquals(newSlice.getLocalDictionaries().get(0).getSize(), 2);
+ }
+
+ @Test
+ public void factDistinctIIColumnsMapperTest() throws IOException {
+ MapDriver<ImmutableBytesWritable, Result, ShortWritable, Text> mapDriver;
+ FactDistinctIIColumnsMapper mapper = new FactDistinctIIColumnsMapper();
+ mapDriver = MapDriver.newMapDriver(mapper);
+
+ mapDriver.getConfiguration().set(BatchConstants.CFG_II_NAME, iiName);
+ mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ mapDriver.getConfiguration().setStrings("io.serializations", mapDriver.getConfiguration().get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName());
+ mapDriver.addAll(Lists.newArrayList(Collections2.transform(iiRows, new Function<IIRow, Pair<ImmutableBytesWritable, Result>>() {
+ @Nullable
+ @Override
+ public Pair<ImmutableBytesWritable, Result> apply(@Nullable IIRow input) {
+ return new Pair<ImmutableBytesWritable, Result>(new ImmutableBytesWritable(new byte[] { 1 }), Result.create(input.makeCells()));
+ }
+ })));
+
+ List<Pair<ShortWritable, Text>> result = mapDriver.run();
+ Set<String> lstgNames = Sets.newHashSet("FP-non GTC","ABIN");
+ for(Pair<ShortWritable, Text> pair : result)
+ {
+ Assert.assertEquals(pair.getFirst().get(),6);
+ Assert.assertTrue(lstgNames.contains(pair.getSecond().toString()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/ToyIIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/ToyIIStreamBuilder.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/ToyIIStreamBuilder.java
new file mode 100644
index 0000000..3e2a892
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/ToyIIStreamBuilder.java
@@ -0,0 +1,36 @@
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.streaming.Stream;
+import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 3/26/15.
+ *
+ * A IIStreamBuilder that can hold all the built slices in form of IIRow
+ * This is only for test use
+ */
+public class ToyIIStreamBuilder extends IIStreamBuilder {
+ private List<IIRow> result;
+
+ public ToyIIStreamBuilder(BlockingQueue<Stream> queue, IIDesc desc, int partitionId, List<IIRow> result) {
+ super(queue, null, desc, partitionId);
+ this.result = result;
+ }
+
+ protected void outputSlice(Slice slice, TableRecordInfo tableRecordInfo) throws IOException {
+ IIKeyValueCodec codec = new IIKeyValueCodec(tableRecordInfo.getDigest());
+ for (IIRow iiRow : codec.encodeKeyValue(slice)) {
+ result.add(iiRow);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 650c9ac..0c084d5 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -16,6 +16,13 @@
<dependency>
+ <groupId>org.apache.mrunit</groupId>
+ <artifactId>mrunit</artifactId>
+ <classifier>hadoop2</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-invertedindex</artifactId>
<version>${project.parent.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
index 9554797..5c2efdc 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
@@ -156,7 +156,6 @@ public class CubeStreamBuilder extends StreamBuilder {
logger.info("Totally " + generatedCuboids.size() + " cuboids be calculated, takes " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
-
private void calculateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId, Map<Long, GridTable> result) throws IOException {
GridTable thisCuboid;
@@ -220,7 +219,6 @@ public class CubeStreamBuilder extends StreamBuilder {
return gridTable;
}
-
private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
//logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumBitSet(parentCuboidId);
@@ -281,14 +279,12 @@ public class CubeStreamBuilder extends StreamBuilder {
}
private Pair<BitSet, BitSet> getDimensionAndMetricColumBitSet(long cuboidId) {
- BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+ BitSet bitSet = BitSet.valueOf(new long[] { cuboidId });
BitSet dimension = new BitSet();
dimension.set(0, bitSet.cardinality());
BitSet metrics = new BitSet();
metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureNumber);
- return new Pair<BitSet, BitSet>(
- dimension, metrics
- );
+ return new Pair<BitSet, BitSet>(dimension, metrics);
}
private Object[] buildKey(List<String> row, DataTypeSerializer[] serializers) {
@@ -302,7 +298,6 @@ public class CubeStreamBuilder extends StreamBuilder {
return key;
}
-
private Object[] buildValue(List<String> row) {
Object[] values = new Object[desc.getMeasures().size()];
@@ -340,11 +335,10 @@ public class CubeStreamBuilder extends StreamBuilder {
return values;
}
-
private GTInfo newGTInfo(long cuboidID) {
Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumBitSet(cuboidID);
GTInfo.Builder builder = infoBuilder(cuboidID);
- builder.enableColumnBlock(new BitSet[]{dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond()});
+ builder.enableColumnBlock(new BitSet[] { dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond() });
builder.setPrimaryKey(dimensionMetricsBitSet.getFirst());
GTInfo info = builder.build();
return info;
@@ -374,7 +368,6 @@ public class CubeStreamBuilder extends StreamBuilder {
return builder;
}
-
private void buildDictionary(List<List<String>> table, CubeDesc desc, Map<TblColRef, Dictionary> dictionaryMap) {
SetMultimap<TblColRef, String> valueMap = HashMultimap.create();
@@ -399,9 +392,9 @@ public class CubeStreamBuilder extends StreamBuilder {
}));
logger.info("Building dictionary for " + col);
-// DictionaryInfo dictInfo = new DictionaryInfo(col.getTable(), col.getName(), 0, col.getDatatype(), null, "");
-// dictInfo.setDictionaryObject(dict);
-// dictInfo.setDictionaryClass(dict.getClass().getName());
+ // DictionaryInfo dictInfo = new DictionaryInfo(col.getTable(), col.getName(), 0, col.getDatatype(), null, "");
+ // dictInfo.setDictionaryObject(dict);
+ // dictInfo.setDictionaryClass(dict.getClass().getName());
dictionaryMap.put(col, dict);
}
}
@@ -413,5 +406,4 @@ public class CubeStreamBuilder extends StreamBuilder {
return getStreamParser().parse(stream, Lists.newArrayList(desc.listAllColumns()));
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
deleted file mode 100644
index 5ade5f1..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package org.apache.kylin.streaming.invertedindex;
-
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.annotation.Nullable;
-
-import org.apache.kylin.common.util.FIFOIterable;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.invertedindex.model.KeyValueCodec;
-import org.apache.kylin.streaming.Stream;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-
-/**
- * Created by Hongbin Ma(Binmahone) on 3/26/15.
- */
-public class IIKeyValueCodecWithStateTest extends LocalFileMetadataTestCase {
-
- IIInstance ii;
- IIDesc iiDesc;
- List<IIRow> iiRowList = Lists.newArrayList();
-
- final String[] inputs = new String[] { //
- "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
- "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,0,2012-08-16,43479,10000807,26.2474,0", //
- "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0" };
-
- @Before
- public void setUp() throws Exception {
- this.createTestMetadata();
- this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_inner_join");
- this.iiDesc = ii.getDescriptor();
-
- Collection<?> streams = Collections2.transform(Arrays.asList(inputs), new Function<String, Stream>() {
- @Nullable
- @Override
- public Stream apply(String input) {
- return new Stream(0, input.getBytes());
- }
- });
- LinkedBlockingQueue q = new LinkedBlockingQueue();
- q.addAll(streams);
- q.put(new Stream(-1, null));//a stop sign for builder
-
- ToyIIStreamBuilder builder = new ToyIIStreamBuilder(q, iiDesc, 0, iiRowList);
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- Future<?> future = executorService.submit(builder);
- future.get();
- }
-
- @After
- public void after() throws Exception {
- cleanupTestMetadata();
- }
-
- /**
- * simulate stream building into slices, and encode the slice into IIRows.
- * Then reconstruct the IIRows to slice.
- */
- @Test
- public void basicTest() {
- Queue<IIRow> buffer = Lists.newLinkedList();
- FIFOIterable bufferIterable = new FIFOIterable(buffer);
- TableRecordInfo info = new TableRecordInfo(iiDesc);
- TableRecordInfoDigest digest = info.getDigest();
- KeyValueCodec codec = new IIKeyValueCodecWithState(digest);
- Iterator<Slice> slices = codec.decodeKeyValue(bufferIterable).iterator();
-
- Assert.assertTrue(!slices.hasNext());
- Assert.assertEquals(iiRowList.size(), digest.getColumnCount());
-
- for (int i = 0; i < digest.getColumnCount(); ++i) {
- buffer.add(iiRowList.get(i));
-
- if (i != digest.getColumnCount() - 1) {
- Assert.assertTrue(!slices.hasNext());
- } else {
- Assert.assertTrue(slices.hasNext());
- }
- }
-
- Slice newSlice = slices.next();
- Assert.assertEquals(newSlice.getLocalDictionaries().get(0).getSize(), 2);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java
deleted file mode 100644
index 161b6f6..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.kylin.streaming.invertedindex;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.streaming.Stream;
-
-/**
- * Created by Hongbin Ma(Binmahone) on 3/26/15.
- *
- * A IIStreamBuilder that can hold all the built slices in form of IIRow
- * This is only for test use
- */
-public class ToyIIStreamBuilder extends IIStreamBuilder {
- private List<IIRow> result;
-
- public ToyIIStreamBuilder(BlockingQueue<Stream> queue, IIDesc desc, int partitionId, List<IIRow> result) {
- super(queue, null, desc, partitionId);
- this.result = result;
- }
-
- protected void outputSlice(Slice slice, TableRecordInfo tableRecordInfo) throws IOException {
- IIKeyValueCodec codec = new IIKeyValueCodec(tableRecordInfo.getDigest());
- for (IIRow iiRow : codec.encodeKeyValue(slice)) {
- result.add(iiRow);
- }
- }
-
-}