You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/17 00:03:30 UTC

[3/4] kylin git commit: KYLIN-2283 New datagen done, UT pass, pending replace CI

KYLIN-2283 New datagen done, UT pass, pending replace CI


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f0fcbc06
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f0fcbc06
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f0fcbc06

Branch: refs/heads/KYLIN-2283
Commit: f0fcbc0653a68300da9efb57425bd46850ae14e3
Parents: d79d937
Author: Li Yang <li...@apache.org>
Authored: Thu Dec 15 18:51:00 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sat Dec 17 07:50:53 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/metadata/model/ColumnDesc.java |   8 +
 .../apache/kylin/metadata/model/TableDesc.java  |   9 +
 .../kylin/source/datagen/ColumnGenConfig.java   | 105 +++++
 .../kylin/source/datagen/ColumnGenerator.java   | 347 ++++++++++++++++
 .../source/datagen/ModelDataGenerator.java      | 193 +++++++++
 .../kylin/source/datagen/TableGenConfig.java    |  44 ++
 .../org/apache/kylin/source/datagen/Util.java   |  73 ++++
 .../topn/TopNCounterSerializerTest.java         |  76 ----
 .../measure/topn/TopNCounterSerializerTest.java |  76 ++++
 .../kylin/source/datagen/DataGenTest.java       |  52 +++
 .../localmeta/data/flatten_data_for_ii.csv      | 402 -------------------
 .../table/DEFAULT.TEST_KYLIN_FACT.json          |  32 +-
 12 files changed, 928 insertions(+), 489 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f0fcbc06/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
index 2da1f5e..7105ede 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -45,6 +45,10 @@ public class ColumnDesc implements Serializable {
     @JsonInclude(JsonInclude.Include.NON_NULL)
     private String comment;
 
+    @JsonProperty("data_gen")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private String dataGen;
+
     // parsed from data type
     private DataType type;
     private DataType upgradedType;
@@ -148,6 +152,10 @@ public class ColumnDesc implements Serializable {
     public void setNullable(boolean nullable) {
         this.isNullable = nullable;
     }
+    
+    public String getDataGen() {
+        return dataGen;
+    }
 
     public void init(TableDesc table) {
         this.table = table;

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0fcbc06/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index ab8c465..e845da1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -26,6 +26,7 @@ import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.StringSplitter;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
@@ -47,6 +48,10 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
     private int sourceType = ISourceAware.ID_HIVE;
     @JsonProperty("table_type")
     private String tableType;
+    
+    @JsonProperty("data_gen")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private String dataGen;
 
     private DatabaseDesc database = new DatabaseDesc();
 
@@ -160,6 +165,10 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
         return getMaxColumnIndex() + 1;
     }
 
+    public String getDataGen() {
+        return dataGen;
+    }
+
     public void init() {
         if (name != null)
             name = name.toUpperCase();

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0fcbc06/core-metadata/src/main/java/org/apache/kylin/source/datagen/ColumnGenConfig.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/datagen/ColumnGenConfig.java b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ColumnGenConfig.java
new file mode 100644
index 0000000..3d04cf2
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ColumnGenConfig.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.datagen;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.metadata.model.ColumnDesc;
+
+public class ColumnGenConfig {
+
+    public static final String ID = "ID";
+    public static final String RAND = "RAND";
+    public static final String $RANDOM = "${RANDOM}";
+    
+    // discrete values
+    boolean isDiscrete;
+    boolean isFK;
+    List<String> values;
+    
+    // random
+    boolean isRandom;
+    String randFormat;
+    int randStart;
+    int randEnd;
+    
+    // ID
+    boolean isID;
+    int idStart;
+    
+    // general
+    int cardinality;
+    boolean genNull;
+    double genNullPct;
+    boolean order;
+    
+    public ColumnGenConfig(ColumnDesc col, ModelDataGenerator modelGen) throws IOException {
+        init(col, modelGen);
+    }
+
+    private void init(ColumnDesc col, ModelDataGenerator modelGen) throws IOException {
+        
+        Map<String, String> config = Util.parseEqualCommaPairs(col.getDataGen(), "values");
+
+        values = Arrays.asList(Util.parseString(config, "values", "").split("[|]"));
+        
+        List<String> pkValues = modelGen.getPkValuesIfIsFk(col);
+        
+        if (pkValues != null) {
+            isFK = true;
+            values = pkValues;
+        } else if (ID.equals(values.get(0))) {
+            isID = true;
+            idStart = (values.size() > 1) ? Integer.parseInt(values.get(1)) : 0;
+        } else if (RAND.equals(values.get(0)) || values.get(0).isEmpty()) {
+            isRandom = true;
+            randFormat = (values.size() > 1) ? values.get(1) : "";
+            randStart = (values.size() > 2) ? Integer.parseInt(values.get(2)) : 0;
+            randEnd = (values.size() > 3) ? Integer.parseInt(values.get(3)) : 0;
+        } else {
+            isDiscrete = true;
+        }
+        
+        cardinality = Util.parseInt(config, "card", guessCardinality(col.getName()));
+        genNull = Util.parseBoolean(config, "null", guessGenNull(col.getName()));
+        genNullPct = Util.parseDouble(config, "nullpct", 0.01);
+        order = Util.parseBoolean(config, "order", false);
+    }
+
+    private int guessCardinality(String col) {
+        for (String s : col.split("_")) {
+            if (s.startsWith("C")) {
+                try {
+                    return Integer.parseInt(s.substring(1));
+                } catch (Exception ex) {
+                    // ok
+                }
+            }
+        }
+        return 0;
+    }
+
+    private boolean guessGenNull(String col) {
+        return col.contains("_NULL");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0fcbc06/core-metadata/src/main/java/org/apache/kylin/source/datagen/ColumnGenerator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/datagen/ColumnGenerator.java b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ColumnGenerator.java
new file mode 100644
index 0000000..fb7ec36
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ColumnGenerator.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.datagen;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.TreeSet;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.ColumnDesc;
+
+import com.google.common.base.Preconditions;
+
+public class ColumnGenerator {
+
+    final private ColumnGenConfig conf;
+    final private ColumnDesc targetCol;
+    final private int targetRows;
+
+    public ColumnGenerator(ColumnDesc col, int nRows, ModelDataGenerator modelGen) throws IOException {
+        this.conf = new ColumnGenConfig(col, modelGen);
+        this.targetCol = col;
+        this.targetRows = nRows;
+    }
+
+    public Iterator<String> generate(long seed) {
+        Base result;
+        if (conf.isFK) {
+            result = new DiscreteGen(conf.values, seed);
+        } else if (conf.isID) {
+            result = new IDGen(conf.idStart);
+        } else if (conf.isRandom) {
+            result = new RandomGen(targetCol, conf.randFormat, conf.randStart, conf.randEnd, conf.cardinality);
+        } else {
+            result = new DiscreteGen(conf.values);
+        }
+
+        if (conf.cardinality > 0) {
+            result = new CardinalityFilter(result, conf.cardinality);
+        }
+
+        if (conf.genNull) {
+            result = new AddNullFilter(result, conf.genNullPct);
+        }
+
+        if (conf.order) {
+            result = new OrderFilter(result, targetRows);
+        }
+
+        return result;
+    }
+
+    abstract public static class Base implements Iterator<String> {
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private static class RandomGen extends Base {
+
+        private DataType type;
+        private String format;
+        private int randStart;
+        private int randEnd;
+        private Random rand;
+
+        public RandomGen(ColumnDesc col, String format, int randStart, int randEnd, int cardinality) {
+            this.type = col.getType();
+
+            if (type.isStringFamily()) {
+                // string
+                if (StringUtils.isBlank(format)) {
+                    String name = col.getName();
+                    format = name.substring(0, Math.min(4, name.length())) + ColumnGenConfig.$RANDOM;
+                }
+                Preconditions.checkArgument(format.contains(ColumnGenConfig.$RANDOM));
+                initNumberRange(randStart, randEnd, cardinality);
+            } else if (type.isTimeFamily()) {
+                // time
+                format = StringUtil.noBlank(format, DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
+                initDateTimeRange(randStart, randEnd, 0);
+            } else if (type.isDateTimeFamily()) {
+                // date
+                format = StringUtil.noBlank(format, DateFormat.DEFAULT_DATE_PATTERN);
+                initDateTimeRange(randStart, randEnd, cardinality);
+            } else if (type.isIntegerFamily()) {
+                // integer
+                initNumberRange(randStart, randEnd, cardinality);
+                format = StringUtil.noBlank(format, "#");
+            } else if (type.isNumberFamily()) {
+                // double
+                initNumberRange(randStart, randEnd, 0);
+                format = StringUtil.noBlank(format, ".##");
+            } else {
+                throw new IllegalArgumentException();
+            }
+
+            this.format = format;
+            this.rand = new Random();
+        }
+
+        private void initDateTimeRange(int randStart, int randEnd, int days) {
+            if (randStart == 0 && randEnd == 0) {
+                randStart = 2010;
+                randEnd = 2015;
+            }
+            randEnd = Math.max(randEnd, randStart + (days / 365) + 1);
+
+            Preconditions.checkArgument(randStart < randEnd);
+            Preconditions.checkArgument((randEnd - randStart) * 365 >= days);
+
+            this.randStart = randStart;
+            this.randEnd = randEnd;
+        }
+
+        private void initNumberRange(int randStart, int randEnd, int cardinality) {
+            if (randStart == 0 && randEnd == 0) {
+                randStart = 0;
+                randEnd = 1000;
+            }
+            randEnd = Math.max(randEnd, randStart + cardinality);
+
+            Preconditions.checkArgument(randStart < randEnd);
+            Preconditions.checkArgument(randEnd - randStart >= cardinality);
+
+            this.randStart = randStart;
+            this.randEnd = randEnd;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return true;
+        }
+
+        @Override
+        public String next() {
+            if (type.isStringFamily()) {
+                // string
+                return format.replace(ColumnGenConfig.$RANDOM, "" + randomInt());
+            } else if (type.isTimeFamily()) {
+                // time
+                return DateFormat.formatToTimeStr(randomMillis(), format);
+            } else if (type.isDateTimeFamily()) {
+                // date
+                return DateFormat.formatToDateStr(randomMillis(), format);
+            } else if (type.isIntegerFamily()) {
+                // integer
+                return formatNumber(randomInt());
+            } else if (type.isNumberFamily()) {
+                // double
+                return formatNumber(randomDouble());
+            } else {
+                throw new IllegalStateException();
+            }
+        }
+
+        private String formatNumber(double i) {
+            return new DecimalFormat(format).format(i);
+        }
+
+        private int randomInt() {
+            return randStart + rand.nextInt(randEnd - randStart);
+        }
+
+        private double randomDouble() {
+            return randomInt() + rand.nextDouble();
+        }
+
+        private long randomMillis() {
+            int secondsInYear = 3600 * 24 * 365;
+            long year = randStart + rand.nextInt(randEnd - randStart) - 1970;
+            long second = year * secondsInYear + rand.nextInt(secondsInYear);
+            return second * 1000;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+    }
+
+    private static class IDGen extends Base {
+
+        int next;
+
+        public IDGen(int start) {
+            next = start;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return true;
+        }
+
+        @Override
+        public String next() {
+            return "" + (next++);
+        }
+    }
+
+    private static class DiscreteGen extends Base {
+
+        private List<String> values;
+        private Random rand;
+
+        public DiscreteGen(List<String> values) {
+            this.values = values;
+            this.rand = new Random();
+        }
+
+        public DiscreteGen(List<String> values, long seed) {
+            this.values = values;
+            this.rand = new Random(seed);
+        }
+
+        @Override
+        public boolean hasNext() {
+            return true;
+        }
+
+        @Override
+        public String next() {
+            if (values.isEmpty())
+                return null;
+            else
+                return values.get(rand.nextInt(values.size()));
+        }
+    }
+
+    private static class CardinalityFilter extends Base {
+
+        private Iterator<String> input;
+        private int card;
+        private TreeSet<String> cache;
+
+        public CardinalityFilter(Iterator<String> input, int card) {
+            assert card > 0;
+            this.input = input;
+            this.card = card;
+            this.cache = new TreeSet<String>();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return input.hasNext();
+        }
+
+        @Override
+        public String next() {
+            String r = input.next();
+
+            if (cache.size() < card) {
+                cache.add(r);
+                return r;
+            }
+
+            r = cache.floor(r);
+            return r == null ? cache.first() : r;
+        }
+    }
+
+    private static class AddNullFilter extends Base {
+
+        private Iterator<String> input;
+        private double nullPct;
+        private Random rand;
+
+        public AddNullFilter(Iterator<String> input, double nullPct) {
+            this.input = input;
+            this.nullPct = nullPct;
+            this.rand = new Random();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return true;
+        }
+
+        @Override
+        public String next() {
+            return rand.nextDouble() < nullPct || !input.hasNext() ? null : input.next();
+        }
+    }
+
+    private static class OrderFilter extends Base {
+
+        private Iterator<String> iter;
+
+        public OrderFilter(Iterator<String> input, int targetRows) {
+            ArrayList<String> cache = new ArrayList<>(targetRows);
+            for (int i = 0; i < targetRows; i++) {
+                cache.add(input.next());
+            }
+            Collections.sort(cache, new Comparator<String>() {
+                @Override
+                public int compare(String s1, String s2) {
+                    if (s1 == null) {
+                        return s2 == null ? 0 : -1;
+                    } else if (s2 == null) {
+                        return 1;
+                    } else {
+                        return s1.compareTo(s2);
+                    }
+                }
+            });
+
+            iter = cache.iterator();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public String next() {
+            return iter.next();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0fcbc06/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java
new file mode 100644
index 0000000..b85703c
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.datagen;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.JoinTableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.base.Preconditions;
+
+public class ModelDataGenerator {
+
+    final private DataModelDesc model;
+    final private int targetRows;
+    final private ResourceStore outputStore;
+    final private String outputPath;
+    
+    boolean outprint = false; // for debug
+    
+    public ModelDataGenerator(DataModelDesc model, int nRows) {
+        this(model, nRows, ResourceStore.getStore(model.getConfig()), "/data");
+    }
+    
+    public ModelDataGenerator(DataModelDesc model, int nRows, ResourceStore outputStore, String outputPath) {
+        this.model = model;
+        this.targetRows = nRows;
+        this.outputStore = outputStore;
+        this.outputPath = outputPath;
+    }
+    
+    public void generate() throws IOException {
+        Set<TableDesc> generated = new HashSet<>();
+        
+        JoinTableDesc[] allTables = model.getJoinTables();
+        for (int i = allTables.length - 1; i >= -1; i--) {
+            TableDesc table = (i == -1) ? model.getRootFactTable().getTableDesc() : allTables[i].getTableRef().getTableDesc();
+            if (generated.contains(table))
+                continue;
+            
+            boolean gen = generateTable(table);
+            
+            if (gen)
+                generated.add(table);
+        }
+        
+        generateDDL(generated);
+    }
+
+    private boolean generateTable(TableDesc table) throws IOException {
+        TableGenConfig config = new TableGenConfig(table);
+        if (!config.needGen)
+            return false;
+        
+        ByteArrayOutputStream bout = new ByteArrayOutputStream();
+        PrintWriter pout = new PrintWriter(new OutputStreamWriter(bout, "UTF-8"));
+        
+        generateTableInternal(table, config, pout);
+        
+        pout.close();
+        bout.close();
+        
+        byte[] content = bout.toByteArray();
+        if (outprint) {
+            System.out.println("Generated " + path(table));
+            System.out.println(Bytes.toString(content));
+        }
+        
+        outputStore.putResource(path(table), new ByteArrayInputStream(content), System.currentTimeMillis());
+        return true;
+    }
+
+    private void generateTableInternal(TableDesc table, TableGenConfig config, PrintWriter out) throws IOException {
+        ColumnDesc[] columns = table.getColumns();
+        ColumnGenerator[] colGens = new ColumnGenerator[columns.length];
+        Iterator<String>[] colIters = new Iterator[columns.length];
+        
+        // config.rows is either a multiplier (0,1] or an absolute row number
+        int tableRows = (int) ((config.rows > 1) ? config.rows : targetRows * config.rows);
+        tableRows = Math.max(1, tableRows);
+        
+        // same seed for all columns, to ensure composite FK columns generate correct pairs
+        long seed = System.currentTimeMillis();
+        
+        for (int i = 0; i < columns.length; i++) {
+            colGens[i] = new ColumnGenerator(columns[i], tableRows, this);
+            colIters[i] = colGens[i].generate(seed);
+        }
+        
+        for (int i = 0; i < tableRows; i++) {
+            for (int c = 0; c < columns.length; c++) {
+                if (c > 0)
+                    out.print(",");
+                
+                String v = colIters[c].next();
+                Preconditions.checkState(v == null || !v.contains(","));
+                
+                out.print(v == null ? "\\N" : v); // \N is null for hive
+            }
+            out.print("\n");
+        }
+    }
+
+    public List<String> getPkValuesIfIsFk(ColumnDesc fk) throws IOException {
+        JoinTableDesc[] joinTables = model.getJoinTables();
+        for (int i = 0; i < joinTables.length; i++) {
+            JoinTableDesc joinTable = joinTables[i];
+            ColumnDesc pk = findPk(joinTable, fk);
+            if (pk == null)
+                continue;
+            
+            List<String> pkValues = getPkValues(pk);
+            if (pkValues != null)
+                return pkValues;
+        }
+        return null;
+    }
+
+    private ColumnDesc findPk(JoinTableDesc joinTable, ColumnDesc fk) {
+        TblColRef[] fkCols = joinTable.getJoin().getForeignKeyColumns();
+        for (int i = 0; i < fkCols.length; i++) {
+            if (fkCols[i].getColumnDesc().equals(fk))
+                return joinTable.getJoin().getPrimaryKeyColumns()[i].getColumnDesc();
+        }
+        return null;
+    }
+    
+    private List<String> getPkValues(ColumnDesc pk) throws IOException {
+        String path = path(pk.getTable());
+        if (outputStore.exists(path) == false)
+            return null;
+
+        List<String> r = new ArrayList<>();
+        
+        BufferedReader in = new BufferedReader(new InputStreamReader(outputStore.getResource(path).inputStream, "UTF-8"));
+        try {
+            String line;
+            while ((line = in.readLine()) != null) {
+                r.add(line.split(",")[pk.getZeroBasedIndex()]);
+            }
+        } finally {
+            IOUtils.closeQuietly(in);
+        }
+        return r;
+    }
+
+    private void generateDDL(Set<TableDesc> generated) {
+        // TODO Auto-generated method stub
+        
+    }
+
+    private String path(TableDesc table) {
+        return outputPath + "/" + table.getIdentity() + ".csv";
+    }
+    
+    public DataModelDesc getModle() {
+        return model;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0fcbc06/core-metadata/src/main/java/org/apache/kylin/source/datagen/TableGenConfig.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/datagen/TableGenConfig.java b/core-metadata/src/main/java/org/apache/kylin/source/datagen/TableGenConfig.java
new file mode 100644
index 0000000..1c00d3d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/datagen/TableGenConfig.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.datagen;
+
+import java.util.Map;
+
+import org.apache.kylin.metadata.model.TableDesc;
+
+public class TableGenConfig {
+    
+    boolean needGen;
+    double rows;
+    
+    public TableGenConfig(TableDesc table) {
+        init(table.getDataGen());
+    }
+
+    private void init(String dataGen) {
+        if (dataGen == null)
+            return;
+        
+        needGen = true;
+        
+        Map<String, String> config = Util.parseEqualCommaPairs(dataGen, "rows");
+        rows = Util.parseDouble(config, "rows", 1.0);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0fcbc06/core-metadata/src/main/java/org/apache/kylin/source/datagen/Util.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/source/datagen/Util.java b/core-metadata/src/main/java/org/apache/kylin/source/datagen/Util.java
new file mode 100644
index 0000000..f2e8dbf
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/source/datagen/Util.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.datagen;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class Util {
+
+    static Map<String, String> parseEqualCommaPairs(String equalCommaPairs, String defaultKey) {
+        Map<String, String> r = new LinkedHashMap<>();
+        
+        if (equalCommaPairs == null)
+            return r;
+
+        for (String s : equalCommaPairs.split(",")) {
+            int equal = s.indexOf("=");
+            if (equal < 0) {
+                if (r.containsKey(defaultKey))
+                    r.put(s.trim(), "true");
+                else
+                    r.put(defaultKey, s.trim());
+            } else {
+                r.put(s.substring(0, equal).trim(), s.substring(equal + 1).trim());
+            }
+        }
+        return r;
+    }
+
+    static double parseDouble(Map<String, String> config, String key, double dft) {
+        if (config.containsKey(key))
+            return Double.parseDouble(config.get(key));
+        else
+            return dft;
+    }
+
+    static boolean parseBoolean(Map<String, String> config, String key, boolean dft) {
+        if (config.containsKey(key))
+            return Boolean.parseBoolean(config.get(key));
+        else
+            return dft;
+    }
+
+    public static int parseInt(Map<String, String> config, String key, int dft) {
+        if (config.containsKey(key))
+            return Integer.parseInt(config.get(key));
+        else
+            return dft;
+    }
+
+    public static String parseString(Map<String, String> config, String key, String dft) {
+        if (config.containsKey(key))
+            return config.get(key);
+        else
+            return dft;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0fcbc06/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
deleted file mode 100644
index 1ce17fe..0000000
--- a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.topn;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.measure.topn.TopNCounter;
-import org.apache.kylin.measure.topn.TopNCounterSerializer;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TopNCounterSerializerTest extends LocalFileMetadataTestCase {
-
-    private static TopNCounterSerializer serializer;
-
-    @BeforeClass
-    public static void setUp() throws Exception {
-        staticCreateTestMetadata();
-
-        DataType.register("topn");
-        serializer = new TopNCounterSerializer(DataType.getType("topn(10)"));
-    }
-
-    @AfterClass
-    public static void after() throws Exception {
-        cleanAfterClass();
-    }
-
-    @Test
-    public void testSerialization() {
-        TopNCounter<ByteArray> vs = new TopNCounter<ByteArray>(50);
-        Integer[] stream = { 1, 1, 2, 9, 1, 2, 3, 7, 7, 1, 3, 1, 1 };
-        for (Integer i : stream) {
-            vs.offer(new ByteArray(Bytes.toBytes(i)));
-        }
-        vs.sortAndRetain();
-        ByteBuffer out = ByteBuffer.allocate(1024);
-        serializer.serialize(vs, out);
-
-        byte[] copyBytes = new byte[out.position()];
-        System.arraycopy(out.array(), 0, copyBytes, 0, out.position());
-
-        ByteBuffer in = ByteBuffer.wrap(copyBytes);
-        TopNCounter<ByteArray> vsNew = serializer.deserialize(in);
-
-        Assert.assertEquals(vs.toString(), vsNew.toString());
-
-    }
-
-    @Test
-    public void testValueOf() {
-        // FIXME need a good unit test for valueOf()
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0fcbc06/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterSerializerTest.java
new file mode 100644
index 0000000..2daf3b4
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterSerializerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.topn;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.measure.topn.TopNCounter;
+import org.apache.kylin.measure.topn.TopNCounterSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TopNCounterSerializerTest extends LocalFileMetadataTestCase {
+
+    private static TopNCounterSerializer serializer;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        staticCreateTestMetadata();
+
+        DataType.register("topn");
+        serializer = new TopNCounterSerializer(DataType.getType("topn(10)"));
+    }
+
+    @AfterClass
+    public static void after() throws Exception {
+        cleanAfterClass();
+    }
+
+    @Test
+    public void testSerialization() {
+        TopNCounter<ByteArray> vs = new TopNCounter<ByteArray>(50);
+        Integer[] stream = { 1, 1, 2, 9, 1, 2, 3, 7, 7, 1, 3, 1, 1 };
+        for (Integer i : stream) {
+            vs.offer(new ByteArray(Bytes.toBytes(i)));
+        }
+        vs.sortAndRetain();
+        ByteBuffer out = ByteBuffer.allocate(1024);
+        serializer.serialize(vs, out);
+
+        byte[] copyBytes = new byte[out.position()];
+        System.arraycopy(out.array(), 0, copyBytes, 0, out.position());
+
+        ByteBuffer in = ByteBuffer.wrap(copyBytes);
+        TopNCounter<ByteArray> vsNew = serializer.deserialize(in);
+
+        Assert.assertEquals(vs.toString(), vsNew.toString());
+
+    }
+
+    @Test
+    public void testValueOf() {
+        // FIXME need a good unit test for valueOf()
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0fcbc06/core-metadata/src/test/java/org/apache/kylin/source/datagen/DataGenTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/source/datagen/DataGenTest.java b/core-metadata/src/test/java/org/apache/kylin/source/datagen/DataGenTest.java
new file mode 100644
index 0000000..70aba04
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/source/datagen/DataGenTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.datagen;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DataGenTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testBasics() throws IOException {
+        MetadataManager mgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        DataModelDesc model = mgr.getDataModelDesc("test_kylin_inner_join_model_desc");
+        ModelDataGenerator gen = new ModelDataGenerator(model, 100);
+        gen.outprint = true;
+        
+        gen.generate();
+    }
+}