You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/11/05 11:27:31 UTC

kylin git commit: KYLIN-3010 Remove v1 Spark engine

Repository: kylin
Updated Branches:
  refs/heads/master 4917c6581 -> c987b30fa


KYLIN-3010 Remove v1 Spark engine


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c987b30f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c987b30f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c987b30f

Branch: refs/heads/master
Commit: c987b30fab06fa7b0a89cfc468db63bf2fe411cf
Parents: 4917c65
Author: shaofengshi <sh...@apache.org>
Authored: Sun Nov 5 19:26:24 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Nov 5 19:26:24 2017 +0800

----------------------------------------------------------------------
 .../engine/spark/SparkBatchCubingEngine.java    |  67 ---
 .../kylin/engine/spark/SparkCountDemo.java      |  80 ---
 .../apache/kylin/engine/spark/SparkCubing.java  | 585 -------------------
 .../engine/spark/SparkCubingJobBuilder.java     |  69 ---
 .../kylin/engine/spark/SparkCuboidWriter.java   |  29 -
 .../kylin/engine/spark/SparkHelloWorld.java     |  37 --
 .../kylin/engine/spark/SparkHiveDemo.java       |  52 --
 .../engine/spark/cube/BufferedCuboidWriter.java | 106 ----
 .../spark/cube/DefaultTupleConverter.java       |  98 ----
 .../spark/cube/ListBackedCuboidWriter.java      |  59 --
 .../kylin/engine/spark/cube/TupleConverter.java |  29 -
 .../spark/cube/BufferedCuboidWriterTest.java    |  69 ---
 12 files changed, 1280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
deleted file mode 100644
index 08ed207..0000000
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.spark;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.IBatchCubingEngine;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-
-/**
- */
-public class SparkBatchCubingEngine implements IBatchCubingEngine {
-
-    private final String confPath;
-    private final String coprocessor;
-
-    public SparkBatchCubingEngine(String confPath, String coprocessor) {
-        this.confPath = confPath;
-        this.coprocessor = coprocessor;
-    }
-
-    @Override
-    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
-        return new SparkCubingJobBuilder(newSegment, submitter, confPath, coprocessor).build();
-    }
-
-    @Override
-    public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
-        return null;
-    }
-
-    @Override
-    public Class<?> getSourceInterface() {
-        return null;
-    }
-
-    @Override
-    public Class<?> getStorageInterface() {
-        return null;
-    }
-
-    @Override
-    public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) {
-        throw new UnsupportedOperationException();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
deleted file mode 100644
index 6478c10..0000000
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.spark;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.storage.StorageLevel;
-
-import scala.Tuple2;
-
-/**
- */
-public class SparkCountDemo extends AbstractApplication {
-
-    private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
-
-    private Options options;
-
-    public SparkCountDemo() {
-        options = new Options();
-        //        options.addOption(OPTION_INPUT_PATH);
-    }
-
-    @Override
-    protected Options getOptions() {
-        return options;
-    }
-
-    @Override
-    protected void execute(OptionsHelper optionsHelper) throws Exception {
-        String logFile = "hdfs://10.249.65.231:8020/tmp/kylin.properties"; // Should be some file on your system
-        SparkConf conf = new SparkConf().setAppName("Simple Application");
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        final JavaPairRDD<String, Integer> logData = sc.textFile(logFile).mapToPair(new PairFunction<String, String, Integer>() {
-
-            @Override
-            public Tuple2<String, Integer> call(String s) throws Exception {
-                return new Tuple2<String, Integer>(s, s.length());
-            }
-        }).sortByKey();
-        logData.persist(StorageLevel.MEMORY_AND_DISK_SER());
-
-        System.out.println("line number:" + logData.count());
-
-        logData.mapToPair(new PairFunction<Tuple2<String, Integer>, ImmutableBytesWritable, KeyValue>() {
-            @Override
-            public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
-                ImmutableBytesWritable key = new ImmutableBytesWritable(stringIntegerTuple2._1().getBytes());
-                KeyValue value = new KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(), String.valueOf(stringIntegerTuple2._2()).getBytes());
-                return new Tuple2(key, value);
-            }
-        }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile", ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
deleted file mode 100644
index c8aee5d..0000000
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ /dev/null
@@ -1,585 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.spark;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
-import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
-import org.apache.kylin.cube.kv.CubeDimEncMap;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.cube.model.RowKeyDesc;
-import org.apache.kylin.cube.util.CubingUtils;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
-import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
-import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
-import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
-import org.apache.kylin.engine.spark.util.IteratorUtils;
-import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.MeasureAggregators;
-import org.apache.kylin.measure.hllc.HLLCounter;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hbase.steps.CreateHTableJob;
-import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
-import org.apache.spark.Partitioner;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkFiles;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.hive.HiveContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import com.google.common.primitives.UnsignedBytes;
-
-import scala.Tuple2;
-
-/**
- */
-public class SparkCubing extends AbstractApplication {
-
-    protected static final Logger logger = LoggerFactory.getLogger(SparkCubing.class);
-
-    private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
-    private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
-    private static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId");
-    private static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath");
-    private static final Option OPTION_COPROCESSOR = OptionBuilder.withArgName("coprocessor").hasArg().isRequired(true).withDescription("Coprocessor Jar Path").create("coprocessor");
-
-    private Options options;
-
-    public SparkCubing() {
-        options = new Options();
-        options.addOption(OPTION_INPUT_PATH);
-        options.addOption(OPTION_CUBE_NAME);
-        options.addOption(OPTION_SEGMENT_ID);
-        options.addOption(OPTION_CONF_PATH);
-        options.addOption(OPTION_COPROCESSOR);
-
-    }
-
-    @Override
-    protected Options getOptions() {
-        return options;
-    }
-
-    public static KylinConfig loadKylinPropsAndMetadata(String folder) throws IOException {
-        File metaDir = new File(folder);
-        if (!metaDir.getAbsolutePath().equals(System.getProperty(KylinConfig.KYLIN_CONF))) {
-            System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
-            logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
-            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-            System.out.println("setting metadataUrl to " + metaDir.getAbsolutePath());
-            kylinConfig.setMetadataUrl(metaDir.getAbsolutePath());
-            return kylinConfig;
-        } else {
-            return KylinConfig.getInstanceFromEnv();
-        }
-    }
-
-    private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception {
-        ClassUtil.addClasspath(confPath);
-        final File[] files = new File(confPath).listFiles(new FileFilter() {
-            @Override
-            public boolean accept(File pathname) {
-                if (pathname.getAbsolutePath().endsWith(".xml")) {
-                    return true;
-                }
-                if (pathname.getAbsolutePath().endsWith(".properties")) {
-                    return true;
-                }
-                return false;
-            }
-        });
-        if (files == null) {
-            return;
-        }
-        for (File file : files) {
-            sc.addFile(file.getAbsolutePath());
-        }
-    }
-
-    private void writeDictionary(Dataset<Row> intermediateTable, String cubeName, String segmentId) throws Exception {
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-        final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
-        final String[] columns = intermediateTable.columns();
-        final CubeSegment seg = cubeInstance.getSegmentById(segmentId);
-        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
-        final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
-        final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc);
-        final List<TblColRef> baseCuboidColumn = Cuboid.findById(seg, Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
-        final long start = System.currentTimeMillis();
-        final RowKeyDesc rowKey = cubeDesc.getRowkey();
-        for (int i = 0; i < baseCuboidColumn.size(); i++) {
-            TblColRef col = baseCuboidColumn.get(i);
-            if (!rowKey.isUseDictionary(col)) {
-                continue;
-            }
-            final int rowKeyColumnIndex = flatDesc.getRowKeyColumnIndexes()[i];
-            tblColRefMap.put(rowKeyColumnIndex, col);
-        }
-
-        Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();
-        for (Map.Entry<Integer, TblColRef> entry : tblColRefMap.entrySet()) {
-            final String column = columns[entry.getKey()];
-            final TblColRef tblColRef = entry.getValue();
-            final Dataset<Row> frame = intermediateTable.select(column).distinct();
-
-            final List<Row> rows = frame.collectAsList();
-            dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionary(tblColRef.getType(), new IterableDictionaryValueEnumerator(new Iterable<String>() {
-                @Override
-                public Iterator<String> iterator() {
-                    return new Iterator<String>() {
-                        int i = 0;
-
-                        @Override
-                        public boolean hasNext() {
-                            return i < rows.size();
-                        }
-
-                        @Override
-                        public String next() {
-                            if (hasNext()) {
-                                final Row row = rows.get(i++);
-                                final Object o = row.get(0);
-                                return o != null ? o.toString() : null;
-                            } else {
-                                throw new NoSuchElementException();
-                            }
-                        }
-
-                        @Override
-                        public void remove() {
-                            throw new UnsupportedOperationException();
-                        }
-                    };
-                }
-            })));
-        }
-        final long end = System.currentTimeMillis();
-        CubingUtils.writeDictionary(seg, dictionaryMap, start, end);
-        try {
-            CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
-            cubeBuilder.setToUpdateSegs(seg);
-            cubeManager.updateCube(cubeBuilder);
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage());
-        }
-    }
-
-    private Map<Long, HLLCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception {
-        CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-        CubeDesc cubeDesc = cubeInstance.getDescriptor();
-        CuboidScheduler cuboidScheduler = cubeInstance.getCuboidScheduler();
-        Set<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds();
-        final HashMap<Long, HLLCounter> zeroValue = Maps.newHashMap();
-        for (Long id : allCuboidIds) {
-            zeroValue.put(id, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
-        }
-
-        CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
-
-        final int[] rowKeyColumnIndexes = flatDesc.getRowKeyColumnIndexes();
-        final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
-        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
-        final byte[][] row_hashcodes = new byte[nRowKey][];
-
-        for (Long cuboidId : allCuboidIds) {
-            Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)];
-
-            long mask = Long.highestOneBit(baseCuboidId);
-            int position = 0;
-            for (int i = 0; i < nRowKey; i++) {
-                if ((mask & cuboidId) > 0) {
-                    cuboidBitSet[position] = i;
-                    position++;
-                }
-                mask = mask >> 1;
-            }
-            allCuboidsBitSet.put(cuboidId, cuboidBitSet);
-        }
-
-        final HashMap<Long, HLLCounter> samplingResult = rowJavaRDD.aggregate(zeroValue, new Function2<HashMap<Long, HLLCounter>, List<String>, HashMap<Long, HLLCounter>>() {
-
-            final HashFunction hashFunction = Hashing.murmur3_128();
-
-            @Override
-            public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, List<String> v2) throws Exception {
-                for (int i = 0; i < nRowKey; i++) {
-                    Hasher hc = hashFunction.newHasher();
-                    String colValue = v2.get(rowKeyColumnIndexes[i]);
-                    if (colValue != null) {
-                        row_hashcodes[i] = hc.putString(colValue).hash().asBytes();
-                    } else {
-                        row_hashcodes[i] = hc.putInt(0).hash().asBytes();
-                    }
-                }
-
-                for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) {
-                    Hasher hc = hashFunction.newHasher();
-                    HLLCounter counter = v1.get(entry.getKey());
-                    final Integer[] cuboidBitSet = entry.getValue();
-                    for (int position = 0; position < cuboidBitSet.length; position++) {
-                        hc.putBytes(row_hashcodes[cuboidBitSet[position]]);
-                    }
-                    counter.add(hc.hash().asBytes());
-                }
-                return v1;
-            }
-        }, new Function2<HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>, HashMap<Long, HLLCounter>>() {
-            @Override
-            public HashMap<Long, HLLCounter> call(HashMap<Long, HLLCounter> v1, HashMap<Long, HLLCounter> v2) throws Exception {
-                Preconditions.checkArgument(v1.size() == v2.size());
-                Preconditions.checkArgument(v1.size() > 0);
-                for (Map.Entry<Long, HLLCounter> entry : v1.entrySet()) {
-                    final HLLCounter counter1 = entry.getValue();
-                    final HLLCounter counter2 = v2.get(entry.getKey());
-                    counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null"));
-                }
-                return v1;
-            }
-
-        });
-        return samplingResult;
-    }
-
-    /** return hfile location */
-    private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, final byte[][] splitKeys) throws Exception {
-        CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
-        CubeDesc cubeDesc = cubeInstance.getDescriptor();
-        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-        List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeSegment, Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
-        final Map<TblColRef, Integer> columnLengthMap = Maps.newHashMap();
-        final CubeDimEncMap dimEncMap = cubeSegment.getDimensionEncodingMap();
-        for (TblColRef tblColRef : baseCuboidColumn) {
-            columnLengthMap.put(tblColRef, dimEncMap.get(tblColRef).getLengthOfEncoding());
-        }
-        final Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();
-        for (DimensionDesc dim : cubeDesc.getDimensions()) {
-            // dictionary
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (cubeDesc.getRowkey().isUseDictionary(col)) {
-                    Dictionary<String> dict = cubeSegment.getDictionary(col);
-                    if (dict == null) {
-                        System.err.println("Dictionary for " + col + " was not found.");
-                        continue;
-                    }
-                    dictionaryMap.put(col, dict);
-                    System.out.println("col:" + col + " dictionary size:" + dict.getSize());
-                }
-            }
-        }
-
-        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
-            FunctionDesc func = measureDesc.getFunction();
-            List<TblColRef> colRefs = func.getMeasureType().getColumnsNeedDictionary(func);
-            for (TblColRef col : colRefs) {
-                dictionaryMap.put(col, cubeSegment.getDictionary(col));
-            }
-        }
-
-        final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
-
-            @Override
-            public Iterator<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) throws Exception {
-                long t = System.currentTimeMillis();
-                prepare();
-
-                final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
-
-                LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue();
-                System.out.println("load properties finished");
-                IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment);
-                AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(
-                        cubeSegment.getCuboidScheduler(), flatDesc, dictionaryMap);
-                final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap));
-                Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter));
-                try {
-                    while (listIterator.hasNext()) {
-                        for (List<String> row : listIterator.next()) {
-                            blockingQueue.put(row);
-                        }
-                    }
-                    blockingQueue.put(Collections.<String> emptyList());
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-                System.out.println("build partition cost: " + (System.currentTimeMillis() - t) + "ms");
-                return sparkCuboidWriter.getResult().iterator();
-            }
-        });
-
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        Configuration conf = getConfigurationForHFile(cubeSegment.getStorageLocationIdentifier());
-        Path path = new Path(kylinConfig.getHdfsWorkingDirectory(), "hfile_" + UUID.randomUUID().toString());
-        Preconditions.checkArgument(!FileSystem.get(conf).exists(path));
-        String url = conf.get("fs.defaultFS") + path.toString();
-        System.out.println("use " + url + " as hfile");
-        List<MeasureDesc> measuresDescs = cubeDesc.getMeasures();
-        final int measureSize = measuresDescs.size();
-        final String[] dataTypes = new String[measureSize];
-        for (int i = 0; i < dataTypes.length; i++) {
-            dataTypes[i] = measuresDescs.get(i).getFunction().getReturnType();
-        }
-        final MeasureAggregators aggs = new MeasureAggregators(measuresDescs);
-        writeToHFile2(javaPairRDD, dataTypes, measureSize, aggs, splitKeys, conf, url);
-        return url;
-    }
-
-    private void writeToHFile2(final JavaPairRDD<byte[], byte[]> javaPairRDD, final String[] dataTypes, final int measureSize, final MeasureAggregators aggs, final byte[][] splitKeys, final Configuration conf, final String hFileLocation) {
-        javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() {
-            @Override
-            public int numPartitions() {
-                return splitKeys.length + 1;
-            }
-
-            @Override
-            public int getPartition(Object key) {
-                Preconditions.checkArgument(key instanceof byte[]);
-                for (int i = 0, n = splitKeys.length; i < n; ++i) {
-                    if (UnsignedBytes.lexicographicalComparator().compare((byte[]) key, splitKeys[i]) < 0) {
-                        return i;
-                    }
-                }
-                return splitKeys.length;
-            }
-        }, UnsignedBytes.lexicographicalComparator()).mapPartitions(new FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>>() {
-            @Override
-            public Iterator<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception {
-                return new Iterable<Tuple2<byte[], byte[]>>() {
-                    final BufferedMeasureCodec codec = new BufferedMeasureCodec(dataTypes);
-                    final Object[] input = new Object[measureSize];
-                    final Object[] result = new Object[measureSize];
-
-                    @Override
-                    public Iterator<Tuple2<byte[], byte[]>> iterator() {
-                        return IteratorUtils.merge(tuple2Iterator, UnsignedBytes.lexicographicalComparator(), new Function<Iterable<byte[]>, byte[]>() {
-                            @Override
-                            public byte[] call(Iterable<byte[]> v1) throws Exception {
-                                final LinkedList<byte[]> list = Lists.newLinkedList(v1);
-                                if (list.size() == 1) {
-                                    return list.get(0);
-                                }
-                                aggs.reset();
-                                for (byte[] v : list) {
-                                    codec.decode(ByteBuffer.wrap(v), input);
-                                    aggs.aggregate(input);
-                                }
-                                aggs.collectStates(result);
-                                ByteBuffer buffer = codec.encode(result);
-                                byte[] bytes = new byte[buffer.position()];
-                                System.arraycopy(buffer.array(), buffer.arrayOffset(), bytes, 0, buffer.position());
-                                return bytes;
-                            }
-                        });
-                    }
-                }.iterator();
-            }
-        }, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, ImmutableBytesWritable, KeyValue>() {
-            @Override
-            public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<byte[], byte[]> tuple2) throws Exception {
-                ImmutableBytesWritable key = new ImmutableBytesWritable(tuple2._1());
-                KeyValue value = new KeyValue(tuple2._1(), "F1".getBytes(), "M".getBytes(), tuple2._2());
-                return new Tuple2(key, value);
-            }
-        }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf);
-    }
-
-    public static void prepare() throws Exception {
-        final File file = new File(SparkFiles.get("kylin.properties"));
-        final String confPath = file.getParentFile().getAbsolutePath();
-        System.out.println("conf directory:" + confPath);
-        System.setProperty(KylinConfig.KYLIN_CONF, confPath);
-        ClassUtil.addClasspath(confPath);
-    }
-
-    private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HLLCounter> samplingResult) throws Exception {
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
-        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-        final Map<Long, Long> rowCountMap = CubeStatsReader.getCuboidRowCountMapFromSampling(samplingResult, 100);
-        final Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSegment, rowCountMap);
-        System.out.println("cube size estimation:" + cubeSizeMap);
-        final byte[][] splitKeys = CreateHTableJob.getRegionSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, cubeSegment, null); //FIXME: passing non-null value for 'hfileSplitsOutputFolder'
-        CubeHTableUtil.createHTable(cubeSegment, splitKeys);
-        System.out.println(cubeSegment.getStorageLocationIdentifier() + " table created");
-        return splitKeys;
-    }
-
-    private Configuration getConfigurationForHFile(String hTableName) throws IOException {
-        final Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        Job job = Job.getInstance(conf);
-        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-        job.setMapOutputValueClass(KeyValue.class);
-        HTable table = new HTable(conf, hTableName);
-        HFileOutputFormat.configureIncrementalLoad(job, table);
-        return conf;
-    }
-
-    private void bulkLoadHFile(String cubeName, String segmentId, String hfileLocation) throws Exception {
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
-        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-        final Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
-
-        FsShell shell = new FsShell(hbaseConf);
-        try {
-            shell.run(new String[] { "-chmod", "-R", "777", hfileLocation });
-        } catch (Exception e) {
-            logger.error("Couldnt change the file permissions ", e);
-            throw new IOException(e);
-        }
-
-        String[] newArgs = new String[2];
-        newArgs[0] = hfileLocation;
-        newArgs[1] = cubeSegment.getStorageLocationIdentifier();
-
-        int ret = ToolRunner.run(new LoadIncrementalHFiles(hbaseConf), newArgs);
-        System.out.println("incremental load result:" + ret);
-
-        cubeSegment.setStatus(SegmentStatusEnum.READY);
-        try {
-            CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
-            cubeInstance.setStatus(RealizationStatusEnum.READY);
-            cubeSegment.setStatus(SegmentStatusEnum.READY);
-            cubeBuilder.setToUpdateSegs(cubeSegment);
-            CubeManager.getInstance(kylinConfig).updateCube(cubeBuilder);
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage());
-        }
-    }
-
-    @Override
-    protected void execute(OptionsHelper optionsHelper) throws Exception {
-        final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
-        SparkConf conf = new SparkConf().setAppName("Simple Application");
-        //memory conf
-        conf.set("spark.executor.memory", "6g");
-        conf.set("spark.storage.memoryFraction", "0.3");
-
-        //serialization conf
-        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-        conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
-        conf.set("spark.kryo.registrationRequired", "true");
-
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        HiveContext sqlContext = new HiveContext(sc.sc());
-        final Dataset<Row> intermediateTable = sqlContext.sql("select * from " + hiveTable);
-        final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
-        final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
-        final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
-        final String coprocessor = optionsHelper.getOptionValue(OPTION_COPROCESSOR);
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        kylinConfig.overrideCoprocessorLocalJar(coprocessor);
-
-        setupClasspath(sc, confPath);
-        intermediateTable.cache();
-        writeDictionary(intermediateTable, cubeName, segmentId);
-        final JavaRDD<List<String>> rowJavaRDD = intermediateTable.javaRDD().map(new org.apache.spark.api.java.function.Function<Row, List<String>>() {
-            @Override
-            public List<String> call(Row v1) throws Exception {
-                ArrayList<String> result = Lists.newArrayListWithExpectedSize(v1.size());
-                for (int i = 0; i < v1.size(); i++) {
-                    final Object o = v1.get(i);
-                    if (o != null) {
-                        result.add(o.toString());
-                    } else {
-                        result.add(null);
-                    }
-                }
-                return result;
-
-            }
-        });
-
-        final Map<Long, HLLCounter> samplingResult = sampling(rowJavaRDD, cubeName, segmentId);
-        final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult);
-
-        final String hfile = build(rowJavaRDD, cubeName, segmentId, splitKeys);
-        bulkLoadHFile(cubeName, segmentId, hfile);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
deleted file mode 100644
index 76e4521..0000000
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.spark;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.IMROutput2;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class SparkCubingJobBuilder extends JobBuilderSupport {
-    private static final Logger logger = LoggerFactory.getLogger(SparkCubingJobBuilder.class);
-
-    private final IMRInput.IMRBatchCubingInputSide inputSide;
-    private final IMROutput2.IMRBatchCubingOutputSide2 outputSide;
-    private final String confPath;
-    private final String coprocessor;
-
-    public SparkCubingJobBuilder(CubeSegment seg, String submitter, String confPath, String coprocessor) {
-        super(seg, submitter);
-        this.inputSide = MRUtil.getBatchCubingInputSide(seg);
-        this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
-        this.confPath = confPath;
-        this.coprocessor = coprocessor;
-    }
-
-    public DefaultChainedExecutable build() {
-        final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
-
-        inputSide.addStepPhase1_CreateFlatTable(result);
-        final IJoinedFlatTableDesc joinedFlatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
-        final String tableName = joinedFlatTableDesc.getTableName();
-        logger.info("intermediate table:" + tableName);
-
-        final SparkExecutable sparkExecutable = new SparkExecutable();
-        sparkExecutable.setClassName(SparkCubing.class.getName());
-        sparkExecutable.setParam("hiveTable", tableName);
-        sparkExecutable.setParam(CubingExecutableUtil.CUBE_NAME, seg.getRealization().getName());
-        sparkExecutable.setParam("segmentId", seg.getUuid());
-        sparkExecutable.setParam("confPath", confPath);
-        sparkExecutable.setParam("coprocessor", coprocessor);
-        result.addTask(sparkExecutable);
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCuboidWriter.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCuboidWriter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCuboidWriter.java
deleted file mode 100644
index 77ebe69..0000000
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCuboidWriter.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.spark;
-
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-
-import scala.Tuple2;
-
-/**
- */
-public interface SparkCuboidWriter extends ICuboidWriter {
-
-    Iterable<Tuple2<byte[], byte[]>> getResult();
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHelloWorld.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHelloWorld.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHelloWorld.java
deleted file mode 100644
index 4eda50e..0000000
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHelloWorld.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.spark;
-
-import org.apache.commons.cli.Options;
-import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.OptionsHelper;
-
-/**
- */
-public class SparkHelloWorld extends AbstractApplication {
-
-    @Override
-    protected Options getOptions() {
-        return new Options();
-    }
-
-    @Override
-    protected void execute(OptionsHelper optionsHelper) throws Exception {
-        System.out.println("hello kylin-spark");
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
deleted file mode 100644
index 58d4222..0000000
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.spark;
-
-import org.apache.commons.cli.Options;
-import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.hive.HiveContext;
-
-/**
- */
-public class SparkHiveDemo extends AbstractApplication {
-
-    private final Options options;
-
-    public SparkHiveDemo() {
-        options = new Options();
-    }
-
-    @Override
-    protected Options getOptions() {
-        return options;
-    }
-
-    @Override
-    protected void execute(OptionsHelper optionsHelper) throws Exception {
-        SparkConf conf = new SparkConf().setAppName("Simple Application");
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        HiveContext sqlContext = new HiveContext(sc.sc());
-        final Dataset<Row> dataFrame = sqlContext.sql("select * from test_kylin_fact");
-        System.out.println("count * of the table:" + dataFrame.count());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriter.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriter.java
deleted file mode 100644
index b3334b7..0000000
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriter.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.spark.cube;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.kylin.engine.spark.SparkCuboidWriter;
-import org.apache.kylin.gridtable.GTRecord;
-
-import scala.Tuple2;
-
-/**
- */
-public class BufferedCuboidWriter implements SparkCuboidWriter {
-
-    private final LinkedBlockingQueue<Tuple2<byte[], byte[]>> blockingQueue;
-    private final TupleConverter tupleConverter;
-
-    public BufferedCuboidWriter(TupleConverter tupleConverter) {
-        this.blockingQueue = new LinkedBlockingQueue<>(10000);
-        this.tupleConverter = tupleConverter;
-    }
-
-    @Override
-    public void write(final long cuboidId, final GTRecord record) throws IOException {
-        try {
-            blockingQueue.put(tupleConverter.convert(cuboidId, record));
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void flush() {
-    }
-
-    @Override
-    public void close() {
-        try {
-            blockingQueue.put(new Tuple2(new byte[0], new byte[0]));
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public Iterable<Tuple2<byte[], byte[]>> getResult() {
-        return new Iterable<Tuple2<byte[], byte[]>>() {
-            @Override
-            public Iterator<Tuple2<byte[], byte[]>> iterator() {
-                return new Iterator<Tuple2<byte[], byte[]>>() {
-                    Tuple2<byte[], byte[]> current = null;
-
-                    @Override
-                    public boolean hasNext() {
-                        if (current == null) {
-                            try {
-                                current = blockingQueue.take();
-                            } catch (InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                                throw new RuntimeException(e);
-                            }
-                        }
-                        return current._1().length > 0 && current._2().length > 0;
-                    }
-
-                    @Override
-                    public Tuple2<byte[], byte[]> next() {
-                        if (hasNext()) {
-                            Tuple2<byte[], byte[]> result = current;
-                            current = null;
-                            return result;
-                        } else {
-                            throw new NoSuchElementException();
-                        }
-                    }
-
-                    @Override
-                    public void remove() {
-                        throw new UnsupportedOperationException();
-                    }
-                };
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
deleted file mode 100644
index d7b20c8..0000000
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.spark.cube;
-
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.kv.RowKeyEncoder;
-import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import scala.Tuple2;
-
-/**
- */
-public final class DefaultTupleConverter implements TupleConverter {
-
-    private final static transient ThreadLocal<ByteBuffer> valueBuf = new ThreadLocal<>();
-    private final CubeSegment segment;
-    private final int measureCount;
-    private final Map<TblColRef, Integer> columnLengthMap;
-    private RowKeyEncoderProvider rowKeyEncoderProvider;
-    private byte[] rowKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
-
-    public DefaultTupleConverter(CubeSegment segment, Map<TblColRef, Integer> columnLengthMap) {
-        this.segment = segment;
-        this.measureCount = segment.getCubeDesc().getMeasures().size();
-        this.columnLengthMap = columnLengthMap;
-        this.rowKeyEncoderProvider = new RowKeyEncoderProvider(this.segment);
-    }
-
-    private ByteBuffer getValueBuf() {
-        if (valueBuf.get() == null) {
-            valueBuf.set(ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE));
-        }
-        return valueBuf.get();
-    }
-
-    private void setValueBuf(ByteBuffer buf) {
-        valueBuf.set(buf);
-    }
-
-    @Override
-    public final Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record) {
-        Cuboid cuboid = Cuboid.findById(segment, cuboidId);
-        RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
-
-        final int dimensions = Long.bitCount(cuboidId);
-        final ImmutableBitSet measureColumns = new ImmutableBitSet(dimensions, dimensions + measureCount);
-
-        int offSet = 0;
-        for (int x = 0; x < dimensions; x++) {
-            final ByteArray byteArray = record.get(x);
-            System.arraycopy(byteArray.array(), byteArray.offset(), rowKeyBodyBuf, offSet, byteArray.length());
-            offSet += byteArray.length();
-        }
-
-        byte[] rowKey = rowkeyEncoder.createBuf();
-        rowkeyEncoder.encode(new ByteArray(rowKeyBodyBuf, 0, offSet), new ByteArray(rowKey));
-
-        ByteBuffer valueBuf = getValueBuf();
-        valueBuf.clear();
-        try {
-            record.exportColumns(measureColumns, valueBuf);
-        } catch (BufferOverflowException boe) {
-            valueBuf = ByteBuffer.allocate((int) (record.sizeOf(measureColumns) * 1.5));
-            record.exportColumns(measureColumns, valueBuf);
-            setValueBuf(valueBuf);
-        }
-
-        byte[] value = new byte[valueBuf.position()];
-        System.arraycopy(valueBuf.array(), 0, value, 0, valueBuf.position());
-        return new Tuple2<>(rowKey, value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/ListBackedCuboidWriter.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/ListBackedCuboidWriter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/ListBackedCuboidWriter.java
deleted file mode 100644
index a2740bf..0000000
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/ListBackedCuboidWriter.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.spark.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.kylin.engine.spark.SparkCuboidWriter;
-import org.apache.kylin.gridtable.GTRecord;
-
-import scala.Tuple2;
-
-/**
- */
-public class ListBackedCuboidWriter implements SparkCuboidWriter {
-
-    private final ArrayList<Tuple2<byte[], byte[]>> result;
-    private final TupleConverter tupleConverter;
-
-    public ListBackedCuboidWriter(TupleConverter tupleConverter) {
-        this.result = new ArrayList();
-        this.tupleConverter = tupleConverter;
-    }
-
-    @Override
-    public void write(long cuboidId, GTRecord record) throws IOException {
-        result.add(tupleConverter.convert(cuboidId, record));
-    }
-
-    @Override
-    public void flush() {
-
-    }
-
-    @Override
-    public void close() {
-
-    }
-
-    @Override
-    public Iterable<Tuple2<byte[], byte[]>> getResult() {
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/TupleConverter.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/TupleConverter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/TupleConverter.java
deleted file mode 100644
index 3dbdc05..0000000
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/TupleConverter.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.spark.cube;
-
-import org.apache.kylin.gridtable.GTRecord;
-
-import scala.Tuple2;
-
-/**
- */
-public interface TupleConverter {
-
-    Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record);
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c987b30f/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java b/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
deleted file mode 100644
index 8afea55..0000000
--- a/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.spark.cube;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.kylin.gridtable.GTRecord;
-import org.junit.Test;
-
-import scala.Tuple2;
-
-/**
- */
-public class BufferedCuboidWriterTest {
-
-    @Test
-    public void test() throws ExecutionException, InterruptedException {
-        final BufferedCuboidWriter bufferedCuboidWriter = new BufferedCuboidWriter(new TupleConverter() {
-            @Override
-            public Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record) {
-                return new Tuple2<>(Long.valueOf(cuboidId).toString().getBytes(), Long.valueOf(cuboidId).toString().getBytes());
-            }
-        });
-        final int testCount = 10000000;
-        final Future<?> future = Executors.newCachedThreadPool().submit(new Runnable() {
-            @Override
-            public void run() {
-                int i = 0;
-
-                while (i++ < testCount) {
-                    try {
-                        bufferedCuboidWriter.write(i, null);
-                    } catch (IOException e) {
-                        e.printStackTrace();
-                        throw new RuntimeException(e);
-                    }
-                }
-                bufferedCuboidWriter.close();
-            }
-        });
-        long actualCount = 0;
-        for (Tuple2<byte[], byte[]> tuple2 : bufferedCuboidWriter.getResult()) {
-            assertEquals(Long.parseLong(new String(tuple2._1())), ++actualCount);
-        }
-        future.get();
-        assertEquals(actualCount, testCount);
-
-    }
-}