You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/17 11:24:48 UTC

incubator-kylin git commit: KYLIN-894 move spark related to seperate module

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 a6581ad84 -> e063b81ec


KYLIN-894 move spark related to seperate module


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e063b81e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e063b81e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e063b81e

Branch: refs/heads/0.8
Commit: e063b81ec4458a6f021c40b4876ae908bfb27950
Parents: a6581ad
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Jul 17 17:23:24 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jul 17 17:23:24 2015 +0800

----------------------------------------------------------------------
 job/pom.xml                                     | 18 -----
 .../job/spark/AbstractSparkApplication.java     | 31 ---------
 .../apache/kylin/job/spark/SparkCountDemo.java  | 66 -------------------
 .../org/apache/kylin/job/spark/SparkEntry.java  | 24 -------
 .../apache/kylin/job/spark/SparkExecutable.java | 63 ------------------
 .../apache/kylin/job/spark/SparkHelloWorld.java | 29 --------
 .../apache/kylin/job/spark/SparkHiveDemo.java   | 34 ----------
 .../job/impl/threadpool/BaseSchedulerTest.java  |  1 -
 .../impl/threadpool/ITDefaultSchedulerTest.java | 14 ----
 pom.xml                                         |  1 +
 spark/pom.xml                                   | 69 ++++++++++++++++++++
 .../job/spark/AbstractSparkApplication.java     | 31 +++++++++
 .../apache/kylin/job/spark/SparkCountDemo.java  | 66 +++++++++++++++++++
 .../org/apache/kylin/job/spark/SparkEntry.java  | 24 +++++++
 .../apache/kylin/job/spark/SparkExecutable.java | 63 ++++++++++++++++++
 .../apache/kylin/job/spark/SparkHelloWorld.java | 19 ++++++
 .../apache/kylin/job/spark/SparkHiveDemo.java   | 34 ++++++++++
 17 files changed, 307 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/pom.xml
----------------------------------------------------------------------
diff --git a/job/pom.xml b/job/pom.xml
index 41894fe..3819265 100644
--- a/job/pom.xml
+++ b/job/pom.xml
@@ -71,24 +71,6 @@
             <version>${project.parent.version}</version>
         </dependency>
 
-        <dependency> <!-- Spark dependency -->
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.10</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency> <!-- Spark dependency -->
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.10</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency> <!-- Spark dependency -->
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-hive_2.10</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
         <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java b/job/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
deleted file mode 100644
index dd884d5..0000000
--- a/job/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.apache.kylin.job.spark;
-
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.job.tools.OptionsHelper;
-
-import java.io.Serializable;
-
-/**
- */
-public abstract class AbstractSparkApplication implements Serializable {
-
-    protected abstract Options getOptions();
-
-    protected abstract void execute(OptionsHelper optionsHelper) throws Exception;
-
-    public final void execute(String[] args) {
-        OptionsHelper optionsHelper = new OptionsHelper();
-        System.out.println("Spark Application args:" + StringUtils.join(args, " "));
-        try {
-            optionsHelper.parseOptions(getOptions(), args);
-            execute(optionsHelper);
-        } catch (ParseException e) {
-            optionsHelper.printUsage("SparkExecutor", getOptions());
-            throw new RuntimeException("error parsing args", e);
-        } catch (Exception e) {
-            throw new RuntimeException("error execute Spark Application", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java b/job/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
deleted file mode 100644
index c648324..0000000
--- a/job/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.apache.kylin.job.spark;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.storage.StorageLevel;
-import scala.Tuple2;
-
-/**
- */
-public class SparkCountDemo extends AbstractSparkApplication {
-
-    private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
-
-    private Options options;
-
-
-    public SparkCountDemo() {
-        options = new Options();
-//        options.addOption(OPTION_INPUT_PATH);
-    }
-
-    @Override
-    protected Options getOptions() {
-        return options;
-    }
-
-    @Override
-    protected void execute(OptionsHelper optionsHelper) throws Exception {
-        String logFile = "hdfs://10.249.65.231:8020/tmp/kylin.properties"; // Should be some file on your system
-        SparkConf conf = new SparkConf().setAppName("Simple Application");
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        final JavaPairRDD<String, Integer> logData = sc.textFile(logFile).mapToPair(new PairFunction<String, String, Integer>() {
-
-            @Override
-            public Tuple2<String, Integer> call(String s) throws Exception {
-                return new Tuple2<String, Integer>(s, s.length());
-            }
-        }).sortByKey();
-        logData.persist(StorageLevel.MEMORY_AND_DISK_SER());
-
-        System.out.println("line number:" + logData.count());
-
-        logData.mapToPair(new PairFunction<Tuple2<String, Integer>, ImmutableBytesWritable, KeyValue>() {
-            @Override
-            public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
-                ImmutableBytesWritable key = new ImmutableBytesWritable(stringIntegerTuple2._1().getBytes());
-                KeyValue value = new KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(), String.valueOf(stringIntegerTuple2._2()).getBytes());
-                return new Tuple2(key, value);
-            }
-        }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile",
-                ImmutableBytesWritable.class,
-                KeyValue.class,
-                HFileOutputFormat.class
-        );
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkEntry.java b/job/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
deleted file mode 100644
index 451da14..0000000
--- a/job/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.kylin.job.spark;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-
-/**
- */
-public final class SparkEntry {
-
-    public static void main(String[] args) throws Exception {
-        System.out.println("SparkEntry args:" + StringUtils.join(args, " "));
-        Preconditions.checkArgument(args.length >= 2, "-className is required");
-        Preconditions.checkArgument(args[0].equals("-className"), "-className is required");
-        final String className = args[1];
-        final Object o = Class.<AbstractSparkApplication>forName(className).newInstance();
-        Preconditions.checkArgument(o instanceof AbstractSparkApplication, className + " is not a subClass of AbstractSparkApplication");
-        String[] appArgs = new String[args.length - 2];
-        for (int i = 2; i < args.length; i++) {
-            appArgs[i - 2] = args[i];
-        }
-        AbstractSparkApplication abstractSparkApplication = (AbstractSparkApplication) o;
-        abstractSparkApplication.execute(appArgs);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java b/job/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
deleted file mode 100644
index 68d2a8c..0000000
--- a/job/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package org.apache.kylin.job.spark;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- */
-public class SparkExecutable extends AbstractExecutable {
-
-    private static final String CLASS_NAME = "className";
-
-    public void setClassName(String className) {
-        this.setParam(CLASS_NAME, className);
-    }
-
-    private String formatArgs() {
-        StringBuilder stringBuilder = new StringBuilder();
-        for (Map.Entry<String, String> entry : getParams().entrySet()) {
-            stringBuilder.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" ");
-        }
-        if (stringBuilder.length() > 0) {
-            return stringBuilder.substring(0, stringBuilder.length() - 1).toString();
-        } else {
-            return StringUtils.EMPTY;
-        }
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final KylinConfig config = context.getConfig();
-        Preconditions.checkNotNull(config.getSparkHome());
-        Preconditions.checkNotNull(config.getSparkMaster());
-        try {
-            String cmd = String.format("%s/bin/spark-submit --class \"org.apache.kylin.job.spark.SparkEntry\" --master %s %s %s",
-                    config.getSparkHome(),
-                    config.getSparkMaster(),
-                    config.getKylinJobJarPath(),
-                    formatArgs());
-            logger.info("cmd:" + cmd);
-            final StringBuilder output = new StringBuilder();
-            config.getCliCommandExecutor().execute(cmd, new Logger() {
-                @Override
-                public void log(String message) {
-                    output.append(message);
-                    output.append("\n");
-                }
-            });
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
-        } catch (IOException e) {
-            logger.error("error run spark job:", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java b/job/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
deleted file mode 100644
index 3cd919c..0000000
--- a/job/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.kylin.job.spark;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-
-import java.io.IOException;
-
-/**
- */
-public class SparkHelloWorld extends AbstractSparkApplication {
-
-    @Override
-    protected Options getOptions() {
-        return new Options();
-    }
-
-    @Override
-    protected void execute(OptionsHelper optionsHelper) throws Exception {
-        System.out.println("hello kylin-spark");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java b/job/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
deleted file mode 100644
index 48a1449..0000000
--- a/job/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.kylin.job.spark;
-
-import org.apache.commons.cli.Options;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.hive.HiveContext;
-
-/**
- */
-public class SparkHiveDemo extends AbstractSparkApplication {
-
-    private final Options options;
-
-
-    public SparkHiveDemo() {
-        options = new Options();
-    }
-
-    @Override
-    protected Options getOptions() {
-        return options;
-    }
-
-    @Override
-    protected void execute(OptionsHelper optionsHelper) throws Exception {
-        SparkConf conf = new SparkConf().setAppName("Simple Application");
-        JavaSparkContext sc = new JavaSparkContext(conf);
-        HiveContext sqlContext = new HiveContext(sc.sc());
-        final DataFrame dataFrame = sqlContext.sql("select * from test_kylin_fact");
-        System.out.println("count * of the table:" + dataFrame.count());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index a0362d5..29b8304 100644
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.job.impl.threadpool;
 
-import akka.actor.Deploy;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.ZookeeperJobLock;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
index 89f82fa..7c2218b 100644
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
@@ -22,8 +22,6 @@ import org.apache.kylin.job.*;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.spark.SparkExecutable;
-import org.apache.kylin.job.spark.SparkHelloWorld;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -99,16 +97,4 @@ public class ITDefaultSchedulerTest extends BaseSchedulerTest {
         System.out.println(job);
     }
 
-    @Test
-    public void testSparkExecutable() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        SparkExecutable task = new SparkExecutable();
-        task.setClassName(SparkHelloWorld.class.getName());
-        job.addTask(task);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.SUCCEED, job.getStatus());
-        final Output output = jobService.getOutput(task.getId());
-        assertTrue(output.getVerboseMsg().contains("hello kylin-spark"));
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1a23d39..cf8a4f2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -569,6 +569,7 @@
         <module>jdbc</module>
         <module>invertedindex</module>
         <module>streaming</module>
+        <module>spark</module>
     </modules>
 
     <profiles>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
new file mode 100644
index 0000000..e1e2d73
--- /dev/null
+++ b/spark/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin</artifactId>
+        <version>0.7.2-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>kylin-spark</artifactId>
+
+    <name>Kylin:Spark</name>
+    <url>http://maven.apache.org</url>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-job</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+        <!-- Spark dependency -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.10</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.10</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_2.10</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+
+    </dependencies>
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/spark/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java b/spark/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
new file mode 100644
index 0000000..dd884d5
--- /dev/null
+++ b/spark/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
@@ -0,0 +1,31 @@
+package org.apache.kylin.job.spark;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.job.tools.OptionsHelper;
+
+import java.io.Serializable;
+
+/**
+ */
+public abstract class AbstractSparkApplication implements Serializable {
+
+    protected abstract Options getOptions();
+
+    protected abstract void execute(OptionsHelper optionsHelper) throws Exception;
+
+    public final void execute(String[] args) {
+        OptionsHelper optionsHelper = new OptionsHelper();
+        System.out.println("Spark Application args:" + StringUtils.join(args, " "));
+        try {
+            optionsHelper.parseOptions(getOptions(), args);
+            execute(optionsHelper);
+        } catch (ParseException e) {
+            optionsHelper.printUsage("SparkExecutor", getOptions());
+            throw new RuntimeException("error parsing args", e);
+        } catch (Exception e) {
+            throw new RuntimeException("error execute Spark Application", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/spark/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java b/spark/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
new file mode 100644
index 0000000..c648324
--- /dev/null
+++ b/spark/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
@@ -0,0 +1,66 @@
+package org.apache.kylin.job.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.kylin.job.tools.OptionsHelper;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import scala.Tuple2;
+
+/**
+ */
+public class SparkCountDemo extends AbstractSparkApplication {
+
+    private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
+
+    private Options options;
+
+
+    public SparkCountDemo() {
+        options = new Options();
+//        options.addOption(OPTION_INPUT_PATH);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String logFile = "hdfs://10.249.65.231:8020/tmp/kylin.properties"; // Should be some file on your system
+        SparkConf conf = new SparkConf().setAppName("Simple Application");
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        final JavaPairRDD<String, Integer> logData = sc.textFile(logFile).mapToPair(new PairFunction<String, String, Integer>() {
+
+            @Override
+            public Tuple2<String, Integer> call(String s) throws Exception {
+                return new Tuple2<String, Integer>(s, s.length());
+            }
+        }).sortByKey();
+        logData.persist(StorageLevel.MEMORY_AND_DISK_SER());
+
+        System.out.println("line number:" + logData.count());
+
+        logData.mapToPair(new PairFunction<Tuple2<String, Integer>, ImmutableBytesWritable, KeyValue>() {
+            @Override
+            public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
+                ImmutableBytesWritable key = new ImmutableBytesWritable(stringIntegerTuple2._1().getBytes());
+                KeyValue value = new KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(), String.valueOf(stringIntegerTuple2._2()).getBytes());
+                return new Tuple2(key, value);
+            }
+        }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile",
+                ImmutableBytesWritable.class,
+                KeyValue.class,
+                HFileOutputFormat.class
+        );
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/spark/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/kylin/job/spark/SparkEntry.java b/spark/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
new file mode 100644
index 0000000..451da14
--- /dev/null
+++ b/spark/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
@@ -0,0 +1,24 @@
+package org.apache.kylin.job.spark;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ */
+public final class SparkEntry {
+
+    public static void main(String[] args) throws Exception {
+        System.out.println("SparkEntry args:" + StringUtils.join(args, " "));
+        Preconditions.checkArgument(args.length >= 2, "-className is required");
+        Preconditions.checkArgument(args[0].equals("-className"), "-className is required");
+        final String className = args[1];
+        final Object o = Class.<AbstractSparkApplication>forName(className).newInstance();
+        Preconditions.checkArgument(o instanceof AbstractSparkApplication, className + " is not a subClass of AbstractSparkApplication");
+        String[] appArgs = new String[args.length - 2];
+        for (int i = 2; i < args.length; i++) {
+            appArgs[i - 2] = args[i];
+        }
+        AbstractSparkApplication abstractSparkApplication = (AbstractSparkApplication) o;
+        abstractSparkApplication.execute(appArgs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/spark/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java b/spark/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
new file mode 100644
index 0000000..68d2a8c
--- /dev/null
+++ b/spark/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
@@ -0,0 +1,63 @@
+package org.apache.kylin.job.spark;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Logger;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ */
+public class SparkExecutable extends AbstractExecutable {
+
+    private static final String CLASS_NAME = "className";
+
+    public void setClassName(String className) {
+        this.setParam(CLASS_NAME, className);
+    }
+
+    private String formatArgs() {
+        StringBuilder stringBuilder = new StringBuilder();
+        for (Map.Entry<String, String> entry : getParams().entrySet()) {
+            stringBuilder.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" ");
+        }
+        if (stringBuilder.length() > 0) {
+            return stringBuilder.substring(0, stringBuilder.length() - 1).toString();
+        } else {
+            return StringUtils.EMPTY;
+        }
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final KylinConfig config = context.getConfig();
+        Preconditions.checkNotNull(config.getSparkHome());
+        Preconditions.checkNotNull(config.getSparkMaster());
+        try {
+            String cmd = String.format("%s/bin/spark-submit --class \"org.apache.kylin.job.spark.SparkEntry\" --master %s %s %s",
+                    config.getSparkHome(),
+                    config.getSparkMaster(),
+                    config.getKylinJobJarPath(),
+                    formatArgs());
+            logger.info("cmd:" + cmd);
+            final StringBuilder output = new StringBuilder();
+            config.getCliCommandExecutor().execute(cmd, new Logger() {
+                @Override
+                public void log(String message) {
+                    output.append(message);
+                    output.append("\n");
+                }
+            });
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+        } catch (IOException e) {
+            logger.error("error run spark job:", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/spark/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java b/spark/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
new file mode 100644
index 0000000..fee3583
--- /dev/null
+++ b/spark/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
@@ -0,0 +1,19 @@
+package org.apache.kylin.job.spark;
+
+import org.apache.commons.cli.Options;
+import org.apache.kylin.job.tools.OptionsHelper;
+
+/**
+ */
+public class SparkHelloWorld extends AbstractSparkApplication {
+
+    @Override
+    protected Options getOptions() {
+        return new Options();
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        System.out.println("hello kylin-spark");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/spark/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java b/spark/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
new file mode 100644
index 0000000..48a1449
--- /dev/null
+++ b/spark/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
@@ -0,0 +1,34 @@
+package org.apache.kylin.job.spark;
+
+import org.apache.commons.cli.Options;
+import org.apache.kylin.job.tools.OptionsHelper;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.hive.HiveContext;
+
+/**
+ */
+public class SparkHiveDemo extends AbstractSparkApplication {
+
+    private final Options options;
+
+
+    public SparkHiveDemo() {
+        options = new Options();
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        SparkConf conf = new SparkConf().setAppName("Simple Application");
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        HiveContext sqlContext = new HiveContext(sc.sc());
+        final DataFrame dataFrame = sqlContext.sql("select * from test_kylin_fact");
+        System.out.println("count * of the table:" + dataFrame.count());
+    }
+}