You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/17 11:24:48 UTC
incubator-kylin git commit: KYLIN-894 move spark related to seperate
module
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8 a6581ad84 -> e063b81ec
KYLIN-894 move spark related to seperate module
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e063b81e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e063b81e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e063b81e
Branch: refs/heads/0.8
Commit: e063b81ec4458a6f021c40b4876ae908bfb27950
Parents: a6581ad
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Jul 17 17:23:24 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Jul 17 17:23:24 2015 +0800
----------------------------------------------------------------------
job/pom.xml | 18 -----
.../job/spark/AbstractSparkApplication.java | 31 ---------
.../apache/kylin/job/spark/SparkCountDemo.java | 66 -------------------
.../org/apache/kylin/job/spark/SparkEntry.java | 24 -------
.../apache/kylin/job/spark/SparkExecutable.java | 63 ------------------
.../apache/kylin/job/spark/SparkHelloWorld.java | 29 --------
.../apache/kylin/job/spark/SparkHiveDemo.java | 34 ----------
.../job/impl/threadpool/BaseSchedulerTest.java | 1 -
.../impl/threadpool/ITDefaultSchedulerTest.java | 14 ----
pom.xml | 1 +
spark/pom.xml | 69 ++++++++++++++++++++
.../job/spark/AbstractSparkApplication.java | 31 +++++++++
.../apache/kylin/job/spark/SparkCountDemo.java | 66 +++++++++++++++++++
.../org/apache/kylin/job/spark/SparkEntry.java | 24 +++++++
.../apache/kylin/job/spark/SparkExecutable.java | 63 ++++++++++++++++++
.../apache/kylin/job/spark/SparkHelloWorld.java | 19 ++++++
.../apache/kylin/job/spark/SparkHiveDemo.java | 34 ++++++++++
17 files changed, 307 insertions(+), 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/pom.xml
----------------------------------------------------------------------
diff --git a/job/pom.xml b/job/pom.xml
index 41894fe..3819265 100644
--- a/job/pom.xml
+++ b/job/pom.xml
@@ -71,24 +71,6 @@
<version>${project.parent.version}</version>
</dependency>
- <dependency> <!-- Spark dependency -->
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency> <!-- Spark dependency -->
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency> <!-- Spark dependency -->
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.10</artifactId>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java b/job/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
deleted file mode 100644
index dd884d5..0000000
--- a/job/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.apache.kylin.job.spark;
-
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.job.tools.OptionsHelper;
-
-import java.io.Serializable;
-
-/**
- */
-public abstract class AbstractSparkApplication implements Serializable {
-
- protected abstract Options getOptions();
-
- protected abstract void execute(OptionsHelper optionsHelper) throws Exception;
-
- public final void execute(String[] args) {
- OptionsHelper optionsHelper = new OptionsHelper();
- System.out.println("Spark Application args:" + StringUtils.join(args, " "));
- try {
- optionsHelper.parseOptions(getOptions(), args);
- execute(optionsHelper);
- } catch (ParseException e) {
- optionsHelper.printUsage("SparkExecutor", getOptions());
- throw new RuntimeException("error parsing args", e);
- } catch (Exception e) {
- throw new RuntimeException("error execute Spark Application", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java b/job/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
deleted file mode 100644
index c648324..0000000
--- a/job/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.apache.kylin.job.spark;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.storage.StorageLevel;
-import scala.Tuple2;
-
-/**
- */
-public class SparkCountDemo extends AbstractSparkApplication {
-
- private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
-
- private Options options;
-
-
- public SparkCountDemo() {
- options = new Options();
-// options.addOption(OPTION_INPUT_PATH);
- }
-
- @Override
- protected Options getOptions() {
- return options;
- }
-
- @Override
- protected void execute(OptionsHelper optionsHelper) throws Exception {
- String logFile = "hdfs://10.249.65.231:8020/tmp/kylin.properties"; // Should be some file on your system
- SparkConf conf = new SparkConf().setAppName("Simple Application");
- JavaSparkContext sc = new JavaSparkContext(conf);
- final JavaPairRDD<String, Integer> logData = sc.textFile(logFile).mapToPair(new PairFunction<String, String, Integer>() {
-
- @Override
- public Tuple2<String, Integer> call(String s) throws Exception {
- return new Tuple2<String, Integer>(s, s.length());
- }
- }).sortByKey();
- logData.persist(StorageLevel.MEMORY_AND_DISK_SER());
-
- System.out.println("line number:" + logData.count());
-
- logData.mapToPair(new PairFunction<Tuple2<String, Integer>, ImmutableBytesWritable, KeyValue>() {
- @Override
- public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
- ImmutableBytesWritable key = new ImmutableBytesWritable(stringIntegerTuple2._1().getBytes());
- KeyValue value = new KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(), String.valueOf(stringIntegerTuple2._2()).getBytes());
- return new Tuple2(key, value);
- }
- }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile",
- ImmutableBytesWritable.class,
- KeyValue.class,
- HFileOutputFormat.class
- );
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkEntry.java b/job/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
deleted file mode 100644
index 451da14..0000000
--- a/job/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.kylin.job.spark;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-
-/**
- */
-public final class SparkEntry {
-
- public static void main(String[] args) throws Exception {
- System.out.println("SparkEntry args:" + StringUtils.join(args, " "));
- Preconditions.checkArgument(args.length >= 2, "-className is required");
- Preconditions.checkArgument(args[0].equals("-className"), "-className is required");
- final String className = args[1];
- final Object o = Class.<AbstractSparkApplication>forName(className).newInstance();
- Preconditions.checkArgument(o instanceof AbstractSparkApplication, className + " is not a subClass of AbstractSparkApplication");
- String[] appArgs = new String[args.length - 2];
- for (int i = 2; i < args.length; i++) {
- appArgs[i - 2] = args[i];
- }
- AbstractSparkApplication abstractSparkApplication = (AbstractSparkApplication) o;
- abstractSparkApplication.execute(appArgs);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java b/job/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
deleted file mode 100644
index 68d2a8c..0000000
--- a/job/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package org.apache.kylin.job.spark;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- */
-public class SparkExecutable extends AbstractExecutable {
-
- private static final String CLASS_NAME = "className";
-
- public void setClassName(String className) {
- this.setParam(CLASS_NAME, className);
- }
-
- private String formatArgs() {
- StringBuilder stringBuilder = new StringBuilder();
- for (Map.Entry<String, String> entry : getParams().entrySet()) {
- stringBuilder.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" ");
- }
- if (stringBuilder.length() > 0) {
- return stringBuilder.substring(0, stringBuilder.length() - 1).toString();
- } else {
- return StringUtils.EMPTY;
- }
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final KylinConfig config = context.getConfig();
- Preconditions.checkNotNull(config.getSparkHome());
- Preconditions.checkNotNull(config.getSparkMaster());
- try {
- String cmd = String.format("%s/bin/spark-submit --class \"org.apache.kylin.job.spark.SparkEntry\" --master %s %s %s",
- config.getSparkHome(),
- config.getSparkMaster(),
- config.getKylinJobJarPath(),
- formatArgs());
- logger.info("cmd:" + cmd);
- final StringBuilder output = new StringBuilder();
- config.getCliCommandExecutor().execute(cmd, new Logger() {
- @Override
- public void log(String message) {
- output.append(message);
- output.append("\n");
- }
- });
- return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
- } catch (IOException e) {
- logger.error("error run spark job:", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java b/job/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
deleted file mode 100644
index 3cd919c..0000000
--- a/job/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.kylin.job.spark;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-
-import java.io.IOException;
-
-/**
- */
-public class SparkHelloWorld extends AbstractSparkApplication {
-
- @Override
- protected Options getOptions() {
- return new Options();
- }
-
- @Override
- protected void execute(OptionsHelper optionsHelper) throws Exception {
- System.out.println("hello kylin-spark");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java b/job/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
deleted file mode 100644
index 48a1449..0000000
--- a/job/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.kylin.job.spark;
-
-import org.apache.commons.cli.Options;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.hive.HiveContext;
-
-/**
- */
-public class SparkHiveDemo extends AbstractSparkApplication {
-
- private final Options options;
-
-
- public SparkHiveDemo() {
- options = new Options();
- }
-
- @Override
- protected Options getOptions() {
- return options;
- }
-
- @Override
- protected void execute(OptionsHelper optionsHelper) throws Exception {
- SparkConf conf = new SparkConf().setAppName("Simple Application");
- JavaSparkContext sc = new JavaSparkContext(conf);
- HiveContext sqlContext = new HiveContext(sc.sc());
- final DataFrame dataFrame = sqlContext.sql("select * from test_kylin_fact");
- System.out.println("count * of the table:" + dataFrame.count());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index a0362d5..29b8304 100644
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -18,7 +18,6 @@
package org.apache.kylin.job.impl.threadpool;
-import akka.actor.Deploy;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.ZookeeperJobLock;
import org.apache.kylin.common.util.HBaseMetadataTestCase;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
index 89f82fa..7c2218b 100644
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
@@ -22,8 +22,6 @@ import org.apache.kylin.job.*;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.spark.SparkExecutable;
-import org.apache.kylin.job.spark.SparkHelloWorld;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -99,16 +97,4 @@ public class ITDefaultSchedulerTest extends BaseSchedulerTest {
System.out.println(job);
}
- @Test
- public void testSparkExecutable() throws Exception {
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- SparkExecutable task = new SparkExecutable();
- task.setClassName(SparkHelloWorld.class.getName());
- job.addTask(task);
- jobService.addJob(job);
- waitForJobFinish(job.getId());
- assertEquals(ExecutableState.SUCCEED, job.getStatus());
- final Output output = jobService.getOutput(task.getId());
- assertTrue(output.getVerboseMsg().contains("hello kylin-spark"));
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1a23d39..cf8a4f2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -569,6 +569,7 @@
<module>jdbc</module>
<module>invertedindex</module>
<module>streaming</module>
+ <module>spark</module>
</modules>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
new file mode 100644
index 0000000..e1e2d73
--- /dev/null
+++ b/spark/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin</artifactId>
+ <version>0.7.2-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>kylin-spark</artifactId>
+
+ <name>Kylin:Spark</name>
+ <url>http://maven.apache.org</url>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-job</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <!-- Spark dependency -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.10</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.10</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_2.10</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+
+ </dependencies>
+
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/spark/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java b/spark/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
new file mode 100644
index 0000000..dd884d5
--- /dev/null
+++ b/spark/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
@@ -0,0 +1,31 @@
+package org.apache.kylin.job.spark;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.job.tools.OptionsHelper;
+
+import java.io.Serializable;
+
+/**
+ */
+public abstract class AbstractSparkApplication implements Serializable {
+
+ protected abstract Options getOptions();
+
+ protected abstract void execute(OptionsHelper optionsHelper) throws Exception;
+
+ public final void execute(String[] args) {
+ OptionsHelper optionsHelper = new OptionsHelper();
+ System.out.println("Spark Application args:" + StringUtils.join(args, " "));
+ try {
+ optionsHelper.parseOptions(getOptions(), args);
+ execute(optionsHelper);
+ } catch (ParseException e) {
+ optionsHelper.printUsage("SparkExecutor", getOptions());
+ throw new RuntimeException("error parsing args", e);
+ } catch (Exception e) {
+ throw new RuntimeException("error execute Spark Application", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/spark/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java b/spark/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
new file mode 100644
index 0000000..c648324
--- /dev/null
+++ b/spark/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
@@ -0,0 +1,66 @@
+package org.apache.kylin.job.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.kylin.job.tools.OptionsHelper;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import scala.Tuple2;
+
+/**
+ */
+public class SparkCountDemo extends AbstractSparkApplication {
+
+ private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
+
+ private Options options;
+
+
+ public SparkCountDemo() {
+ options = new Options();
+// options.addOption(OPTION_INPUT_PATH);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ String logFile = "hdfs://10.249.65.231:8020/tmp/kylin.properties"; // Should be some file on your system
+ SparkConf conf = new SparkConf().setAppName("Simple Application");
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ final JavaPairRDD<String, Integer> logData = sc.textFile(logFile).mapToPair(new PairFunction<String, String, Integer>() {
+
+ @Override
+ public Tuple2<String, Integer> call(String s) throws Exception {
+ return new Tuple2<String, Integer>(s, s.length());
+ }
+ }).sortByKey();
+ logData.persist(StorageLevel.MEMORY_AND_DISK_SER());
+
+ System.out.println("line number:" + logData.count());
+
+ logData.mapToPair(new PairFunction<Tuple2<String, Integer>, ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
+ ImmutableBytesWritable key = new ImmutableBytesWritable(stringIntegerTuple2._1().getBytes());
+ KeyValue value = new KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(), String.valueOf(stringIntegerTuple2._2()).getBytes());
+ return new Tuple2(key, value);
+ }
+ }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile",
+ ImmutableBytesWritable.class,
+ KeyValue.class,
+ HFileOutputFormat.class
+ );
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/spark/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/kylin/job/spark/SparkEntry.java b/spark/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
new file mode 100644
index 0000000..451da14
--- /dev/null
+++ b/spark/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
@@ -0,0 +1,24 @@
+package org.apache.kylin.job.spark;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ */
+public final class SparkEntry {
+
+ public static void main(String[] args) throws Exception {
+ System.out.println("SparkEntry args:" + StringUtils.join(args, " "));
+ Preconditions.checkArgument(args.length >= 2, "-className is required");
+ Preconditions.checkArgument(args[0].equals("-className"), "-className is required");
+ final String className = args[1];
+ final Object o = Class.<AbstractSparkApplication>forName(className).newInstance();
+ Preconditions.checkArgument(o instanceof AbstractSparkApplication, className + " is not a subClass of AbstractSparkApplication");
+ String[] appArgs = new String[args.length - 2];
+ for (int i = 2; i < args.length; i++) {
+ appArgs[i - 2] = args[i];
+ }
+ AbstractSparkApplication abstractSparkApplication = (AbstractSparkApplication) o;
+ abstractSparkApplication.execute(appArgs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/spark/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java b/spark/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
new file mode 100644
index 0000000..68d2a8c
--- /dev/null
+++ b/spark/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
@@ -0,0 +1,63 @@
+package org.apache.kylin.job.spark;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Logger;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ */
+public class SparkExecutable extends AbstractExecutable {
+
+ private static final String CLASS_NAME = "className";
+
+ public void setClassName(String className) {
+ this.setParam(CLASS_NAME, className);
+ }
+
+ private String formatArgs() {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (Map.Entry<String, String> entry : getParams().entrySet()) {
+ stringBuilder.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" ");
+ }
+ if (stringBuilder.length() > 0) {
+ return stringBuilder.substring(0, stringBuilder.length() - 1).toString();
+ } else {
+ return StringUtils.EMPTY;
+ }
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final KylinConfig config = context.getConfig();
+ Preconditions.checkNotNull(config.getSparkHome());
+ Preconditions.checkNotNull(config.getSparkMaster());
+ try {
+ String cmd = String.format("%s/bin/spark-submit --class \"org.apache.kylin.job.spark.SparkEntry\" --master %s %s %s",
+ config.getSparkHome(),
+ config.getSparkMaster(),
+ config.getKylinJobJarPath(),
+ formatArgs());
+ logger.info("cmd:" + cmd);
+ final StringBuilder output = new StringBuilder();
+ config.getCliCommandExecutor().execute(cmd, new Logger() {
+ @Override
+ public void log(String message) {
+ output.append(message);
+ output.append("\n");
+ }
+ });
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ } catch (IOException e) {
+ logger.error("error run spark job:", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/spark/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java b/spark/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
new file mode 100644
index 0000000..fee3583
--- /dev/null
+++ b/spark/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
@@ -0,0 +1,19 @@
+package org.apache.kylin.job.spark;
+
+import org.apache.commons.cli.Options;
+import org.apache.kylin.job.tools.OptionsHelper;
+
+/**
+ */
+public class SparkHelloWorld extends AbstractSparkApplication {
+
+ @Override
+ protected Options getOptions() {
+ return new Options();
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ System.out.println("hello kylin-spark");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e063b81e/spark/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java b/spark/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
new file mode 100644
index 0000000..48a1449
--- /dev/null
+++ b/spark/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
@@ -0,0 +1,34 @@
+package org.apache.kylin.job.spark;
+
+import org.apache.commons.cli.Options;
+import org.apache.kylin.job.tools.OptionsHelper;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.hive.HiveContext;
+
+/**
+ */
+public class SparkHiveDemo extends AbstractSparkApplication {
+
+ private final Options options;
+
+
+ public SparkHiveDemo() {
+ options = new Options();
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ SparkConf conf = new SparkConf().setAppName("Simple Application");
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ HiveContext sqlContext = new HiveContext(sc.sc());
+ final DataFrame dataFrame = sqlContext.sql("select * from test_kylin_fact");
+ System.out.println("count * of the table:" + dataFrame.count());
+ }
+}