You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:05:09 UTC

[32/50] incubator-kylin git commit: KYLIN-653 fact distinct mapper for II test passed

KYLIN-653 fact distinct mapper for II test passed


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0f8b7a46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0f8b7a46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0f8b7a46

Branch: refs/heads/streaming-localdict
Commit: 0f8b7a4689cde3e4844132efba8665cf0362bf60
Parents: fc5ab52
Author: honma <ho...@ebay.com>
Authored: Fri Mar 27 14:52:46 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Mar 27 15:16:20 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/BasicTest.java |   2 -
 .../test_kylin_cube_with_slr_desc.json          |   2 +-
 .../apache/kylin/invertedindex/model/IIRow.java |  10 ++
 .../cube/FactDistinctIIColumnsMapper.java       |  15 +-
 .../job/hadoop/invertedindex/II2CubeTest.java   | 146 +++++++++++++++++++
 .../invertedindex/ToyIIStreamBuilder.java       |  36 +++++
 streaming/pom.xml                               |   7 +
 .../kylin/streaming/cube/CubeStreamBuilder.java |  20 +--
 .../IIKeyValueCodecWithStateTest.java           | 103 -------------
 .../invertedindex/ToyIIStreamBuilder.java       |  35 -----
 10 files changed, 211 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 0b92bf9..068ebbf 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -23,9 +23,7 @@ import java.nio.ByteBuffer;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.*;
-import java.util.concurrent.*;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.configuration.ConfigurationException;
 import org.junit.Ignore;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
index c4d55f4..5a1049c 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_with_slr_desc.json
@@ -135,7 +135,7 @@
     }, {
       "column" : "lstg_format_name",
       "length" : 12,
-      "dictionary" : null,
+      "dictionary" : "true",
       "mandatory" : false
     }, {
       "column" : "lstg_site_id",

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
index f3d398a..273d1e6 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
@@ -34,10 +34,14 @@
 
 package org.apache.kylin.invertedindex.model;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.common.util.BytesUtil;
 
+import java.util.List;
+
 /**
  * Created by qianzhou on 3/10/15.
  */
@@ -77,4 +81,10 @@ public final class IIRow {
             this.getDictionary().set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
         }
     }
+
+    public List<Cell> makeCells() {
+        Cell a = new KeyValue(this.getKey().copyBytes(), IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, this.getValue().copyBytes());
+        Cell b = new KeyValue(this.getKey().copyBytes(), IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, this.getDictionary().copyBytes());
+        return Lists.newArrayList(a, b);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
index 6a236fd..75709f6 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
@@ -51,14 +51,9 @@ import com.google.common.collect.Lists;
  */
 public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<ImmutableBytesWritable, Result> {
 
-    private IIJoinedFlatTableDesc intermediateTableDesc;
     private Queue<IIRow> buffer = Lists.newLinkedList();
     private Iterator<Slice> slices;
 
-    private String iiName;
-    private IIInstance ii;
-    private IIDesc iiDesc;
-
     private int[] baseCuboidCol2FlattenTableCol;
 
     @Override
@@ -68,11 +63,11 @@ public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<I
         Configuration conf = context.getConfiguration();
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
-        iiName = conf.get(BatchConstants.CFG_II_NAME);
-        ii = IIManager.getInstance(config).getII(iiName);
-        iiDesc = ii.getDescriptor();
+        String iiName = conf.get(BatchConstants.CFG_II_NAME);
+        IIInstance ii = IIManager.getInstance(config).getII(iiName);
+        IIDesc iiDesc = ii.getDescriptor();
 
-        intermediateTableDesc = new IIJoinedFlatTableDesc(iiDesc);
+        IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(iiDesc);
         TableRecordInfo info = new TableRecordInfo(iiDesc);
         KeyValueCodec codec = new IIKeyValueCodecWithState(info.getDigest());
         slices = codec.decodeKeyValue(new FIFOIterable<IIRow>(buffer)).iterator();
@@ -116,7 +111,7 @@ public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<I
                         vBytesBuffer = new byte[dictionary.getSizeOfValue() * 2];
                     }
 
-                    int vid = record.getValueID(baseCuboidIndex);
+                    int vid = record.getValueID(indexInRecord);
                     if (vid == dictionary.nullId()) {
                         continue;
                     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/II2CubeTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/II2CubeTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/II2CubeTest.java
new file mode 100644
index 0000000..6832dcf
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/II2CubeTest.java
@@ -0,0 +1,146 @@
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
+import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.kylin.common.util.FIFOIterable;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.invertedindex.model.KeyValueCodec;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.cube.FactDistinctIIColumnsMapper;
+import org.apache.kylin.streaming.Stream;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 3/26/15.
+ */
+public class II2CubeTest extends LocalFileMetadataTestCase {
+
+    String iiName = "test_kylin_ii_inner_join";
+    IIInstance ii;
+    IIDesc iiDesc;
+    String cubeName = "test_kylin_cube_with_slr_empty";
+
+    List<IIRow> iiRows;
+
+    final String[] inputs = new String[] { //
+    "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
+            "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,0,2012-08-16,43479,10000807,26.2474,0", //
+            "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0" };
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+        this.ii = IIManager.getInstance(getTestConfig()).getII(iiName);
+        this.iiDesc = ii.getDescriptor();
+
+        Collection<?> streams = Collections2.transform(Arrays.asList(inputs), new Function<String, Stream>() {
+            @Nullable
+            @Override
+            public Stream apply(String input) {
+                return new Stream(0, input.getBytes());
+            }
+        });
+        LinkedBlockingQueue q = new LinkedBlockingQueue();
+        q.addAll(streams);
+        q.put(new Stream(-1, null));//a stop sign for builder
+
+        iiRows = Lists.newArrayList();
+        ToyIIStreamBuilder builder = new ToyIIStreamBuilder(q, iiDesc, 0, iiRows);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        Future<?> future = executorService.submit(builder);
+        future.get();
+
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    /**
+     * simulate stream building into slices, and encode the slice into IIRows.
+     * Then reconstruct the IIRows to slice.
+     */
+    @Test
+    public void basicTest() {
+        Queue<IIRow> buffer = Lists.newLinkedList();
+        FIFOIterable bufferIterable = new FIFOIterable(buffer);
+        TableRecordInfo info = new TableRecordInfo(iiDesc);
+        TableRecordInfoDigest digest = info.getDigest();
+        KeyValueCodec codec = new IIKeyValueCodecWithState(digest);
+        Iterator<Slice> slices = codec.decodeKeyValue(bufferIterable).iterator();
+
+        Assert.assertTrue(!slices.hasNext());
+        Assert.assertEquals(iiRows.size(), digest.getColumnCount());
+
+        for (int i = 0; i < digest.getColumnCount(); ++i) {
+            buffer.add(iiRows.get(i));
+
+            if (i != digest.getColumnCount() - 1) {
+                Assert.assertTrue(!slices.hasNext());
+            } else {
+                Assert.assertTrue(slices.hasNext());
+            }
+        }
+
+        Slice newSlice = slices.next();
+        Assert.assertEquals(newSlice.getLocalDictionaries().get(0).getSize(), 2);
+    }
+
+    @Test
+    public void factDistinctIIColumnsMapperTest() throws IOException {
+        MapDriver<ImmutableBytesWritable, Result, ShortWritable, Text> mapDriver;
+        FactDistinctIIColumnsMapper mapper = new FactDistinctIIColumnsMapper();
+        mapDriver = MapDriver.newMapDriver(mapper);
+
+        mapDriver.getConfiguration().set(BatchConstants.CFG_II_NAME, iiName);
+        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+        mapDriver.getConfiguration().setStrings("io.serializations", mapDriver.getConfiguration().get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName());
+        mapDriver.addAll(Lists.newArrayList(Collections2.transform(iiRows, new Function<IIRow, Pair<ImmutableBytesWritable, Result>>() {
+            @Nullable
+            @Override
+            public Pair<ImmutableBytesWritable, Result> apply(@Nullable IIRow input) {
+                return new Pair<ImmutableBytesWritable, Result>(new ImmutableBytesWritable(new byte[] { 1 }), Result.create(input.makeCells()));
+            }
+        })));
+
+        List<Pair<ShortWritable, Text>> result = mapDriver.run();
+        Set<String> lstgNames = Sets.newHashSet("FP-non GTC","ABIN");
+        for(Pair<ShortWritable, Text> pair : result)
+        {
+            Assert.assertEquals(pair.getFirst().get(),6);
+            Assert.assertTrue(lstgNames.contains(pair.getSecond().toString()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/ToyIIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/ToyIIStreamBuilder.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/ToyIIStreamBuilder.java
new file mode 100644
index 0000000..3e2a892
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/ToyIIStreamBuilder.java
@@ -0,0 +1,36 @@
+package org.apache.kylin.job.hadoop.invertedindex;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.streaming.Stream;
+import org.apache.kylin.streaming.invertedindex.IIStreamBuilder;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 3/26/15.
+ *
+ * A IIStreamBuilder that can hold all the built slices in form of IIRow
+ * This is only for test use
+ */
+public class ToyIIStreamBuilder extends IIStreamBuilder {
+    private List<IIRow> result;
+
+    public ToyIIStreamBuilder(BlockingQueue<Stream> queue, IIDesc desc, int partitionId, List<IIRow> result) {
+        super(queue, null, desc, partitionId);
+        this.result = result;
+    }
+
+    protected void outputSlice(Slice slice, TableRecordInfo tableRecordInfo) throws IOException {
+        IIKeyValueCodec codec = new IIKeyValueCodec(tableRecordInfo.getDigest());
+        for (IIRow iiRow : codec.encodeKeyValue(slice)) {
+            result.add(iiRow);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 650c9ac..0c084d5 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -16,6 +16,13 @@
 
 
         <dependency>
+            <groupId>org.apache.mrunit</groupId>
+            <artifactId>mrunit</artifactId>
+            <classifier>hadoop2</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-invertedindex</artifactId>
             <version>${project.parent.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
index 9554797..5c2efdc 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
@@ -156,7 +156,6 @@ public class CubeStreamBuilder extends StreamBuilder {
         logger.info("Totally " + generatedCuboids.size() + " cuboids be calculated, takes " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
     }
 
-
     private void calculateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId, Map<Long, GridTable> result) throws IOException {
 
         GridTable thisCuboid;
@@ -220,7 +219,6 @@ public class CubeStreamBuilder extends StreamBuilder {
         return gridTable;
     }
 
-
     private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
         //logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
         Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumBitSet(parentCuboidId);
@@ -281,14 +279,12 @@ public class CubeStreamBuilder extends StreamBuilder {
     }
 
     private Pair<BitSet, BitSet> getDimensionAndMetricColumBitSet(long cuboidId) {
-        BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+        BitSet bitSet = BitSet.valueOf(new long[] { cuboidId });
         BitSet dimension = new BitSet();
         dimension.set(0, bitSet.cardinality());
         BitSet metrics = new BitSet();
         metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureNumber);
-        return new Pair<BitSet, BitSet>(
-                dimension, metrics
-        );
+        return new Pair<BitSet, BitSet>(dimension, metrics);
     }
 
     private Object[] buildKey(List<String> row, DataTypeSerializer[] serializers) {
@@ -302,7 +298,6 @@ public class CubeStreamBuilder extends StreamBuilder {
         return key;
     }
 
-
     private Object[] buildValue(List<String> row) {
 
         Object[] values = new Object[desc.getMeasures().size()];
@@ -340,11 +335,10 @@ public class CubeStreamBuilder extends StreamBuilder {
         return values;
     }
 
-
     private GTInfo newGTInfo(long cuboidID) {
         Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumBitSet(cuboidID);
         GTInfo.Builder builder = infoBuilder(cuboidID);
-        builder.enableColumnBlock(new BitSet[]{dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond()});
+        builder.enableColumnBlock(new BitSet[] { dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond() });
         builder.setPrimaryKey(dimensionMetricsBitSet.getFirst());
         GTInfo info = builder.build();
         return info;
@@ -374,7 +368,6 @@ public class CubeStreamBuilder extends StreamBuilder {
         return builder;
     }
 
-
     private void buildDictionary(List<List<String>> table, CubeDesc desc, Map<TblColRef, Dictionary> dictionaryMap) {
         SetMultimap<TblColRef, String> valueMap = HashMultimap.create();
 
@@ -399,9 +392,9 @@ public class CubeStreamBuilder extends StreamBuilder {
                     }));
 
                     logger.info("Building dictionary for " + col);
-//                    DictionaryInfo dictInfo = new DictionaryInfo(col.getTable(), col.getName(), 0, col.getDatatype(), null, "");
-//                    dictInfo.setDictionaryObject(dict);
-//                    dictInfo.setDictionaryClass(dict.getClass().getName());
+                    //                    DictionaryInfo dictInfo = new DictionaryInfo(col.getTable(), col.getName(), 0, col.getDatatype(), null, "");
+                    //                    dictInfo.setDictionaryObject(dict);
+                    //                    dictInfo.setDictionaryClass(dict.getClass().getName());
                     dictionaryMap.put(col, dict);
                 }
             }
@@ -413,5 +406,4 @@ public class CubeStreamBuilder extends StreamBuilder {
         return getStreamParser().parse(stream, Lists.newArrayList(desc.listAllColumns()));
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
deleted file mode 100644
index 5ade5f1..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/IIKeyValueCodecWithStateTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package org.apache.kylin.streaming.invertedindex;
-
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.annotation.Nullable;
-
-import org.apache.kylin.common.util.FIFOIterable;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.invertedindex.model.KeyValueCodec;
-import org.apache.kylin.streaming.Stream;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-
-/**
- * Created by Hongbin Ma(Binmahone) on 3/26/15.
- */
-public class IIKeyValueCodecWithStateTest extends LocalFileMetadataTestCase {
-
-    IIInstance ii;
-    IIDesc iiDesc;
-    List<IIRow> iiRowList = Lists.newArrayList();
-
-    final String[] inputs = new String[] { //
-    "FP-non GTC,0,15,145970,0,28,Toys,2008-10-08 07:18:40,USER_Y,Toys & Hobbies,Models & Kits,Automotive,0,Ebay,USER_S,15,Professional-Other,2012-08-16,2012-08-11,0,2012-08-16,145970,10000329,26.8551,0", //
-            "ABIN,0,-99,43479,0,21,Photo,2012-09-11 20:26:04,USER_Y,Cameras & Photo,Film Photography,Other,0,Ebay,USER_S,-99,Not Applicable,2012-08-16,2012-08-11,0,2012-08-16,43479,10000807,26.2474,0", //
-            "ABIN,0,16,80053,0,12,Computers,2012-06-19 21:15:09,USER_Y,Computers/Tablets & Networking,MonitorProjectors & Accs,Monitors,0,Ebay,USER_S,16,Consumer-Other,2012-08-16,2012-08-11,0,2012-08-16,80053,10000261,94.2273,0" };
-
-    @Before
-    public void setUp() throws Exception {
-        this.createTestMetadata();
-        this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_inner_join");
-        this.iiDesc = ii.getDescriptor();
-
-        Collection<?> streams = Collections2.transform(Arrays.asList(inputs), new Function<String, Stream>() {
-            @Nullable
-            @Override
-            public Stream apply(String input) {
-                return new Stream(0, input.getBytes());
-            }
-        });
-        LinkedBlockingQueue q = new LinkedBlockingQueue();
-        q.addAll(streams);
-        q.put(new Stream(-1, null));//a stop sign for builder
-
-        ToyIIStreamBuilder builder = new ToyIIStreamBuilder(q, iiDesc, 0, iiRowList);
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        Future<?> future = executorService.submit(builder);
-        future.get();
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-    }
-
-    /**
-     * simulate stream building into slices, and encode the slice into IIRows.
-     * Then reconstruct the IIRows to slice.
-     */
-    @Test
-    public void basicTest() {
-        Queue<IIRow> buffer = Lists.newLinkedList();
-        FIFOIterable bufferIterable = new FIFOIterable(buffer);
-        TableRecordInfo info = new TableRecordInfo(iiDesc);
-        TableRecordInfoDigest digest = info.getDigest();
-        KeyValueCodec codec = new IIKeyValueCodecWithState(digest);
-        Iterator<Slice> slices = codec.decodeKeyValue(bufferIterable).iterator();
-
-        Assert.assertTrue(!slices.hasNext());
-        Assert.assertEquals(iiRowList.size(), digest.getColumnCount());
-
-        for (int i = 0; i < digest.getColumnCount(); ++i) {
-            buffer.add(iiRowList.get(i));
-
-            if (i != digest.getColumnCount() - 1) {
-                Assert.assertTrue(!slices.hasNext());
-            } else {
-                Assert.assertTrue(slices.hasNext());
-            }
-        }
-
-        Slice newSlice = slices.next();
-        Assert.assertEquals(newSlice.getLocalDictionaries().get(0).getSize(), 2);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0f8b7a46/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java
deleted file mode 100644
index 161b6f6..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/ToyIIStreamBuilder.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.kylin.streaming.invertedindex;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.streaming.Stream;
-
-/**
- * Created by Hongbin Ma(Binmahone) on 3/26/15.
- *
- * A IIStreamBuilder that can hold all the built slices in form of IIRow
- * This is only for test use
- */
-public class ToyIIStreamBuilder extends IIStreamBuilder {
-    private List<IIRow> result;
-
-    public ToyIIStreamBuilder(BlockingQueue<Stream> queue, IIDesc desc, int partitionId, List<IIRow> result) {
-        super(queue, null, desc, partitionId);
-        this.result = result;
-    }
-
-    protected void outputSlice(Slice slice, TableRecordInfo tableRecordInfo) throws IOException {
-        IIKeyValueCodec codec = new IIKeyValueCodec(tableRecordInfo.getDigest());
-        for (IIRow iiRow : codec.encodeKeyValue(slice)) {
-            result.add(iiRow);
-        }
-    }
-
-}