You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/16 09:50:03 UTC

incubator-kylin git commit: KYLIN-894 1. read hive table 2. write hfile

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 d1dcfd155 -> fff9b188f


KYLIN-894
1. read hive table
2. write hfile


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/fff9b188
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/fff9b188
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/fff9b188

Branch: refs/heads/0.8
Commit: fff9b188fb375006737e69750f065ff615b10dac
Parents: d1dcfd1
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Jun 19 12:38:44 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Jul 16 15:49:40 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    | 59 ++++++++++-------
 job/pom.xml                                     | 18 ++++++
 .../job/spark/AbstractSparkApplication.java     | 31 +++++++++
 .../apache/kylin/job/spark/SparkCountDemo.java  | 66 ++++++++++++++++++++
 .../org/apache/kylin/job/spark/SparkEntry.java  | 24 +++++++
 .../apache/kylin/job/spark/SparkExecutable.java | 63 +++++++++++++++++++
 .../apache/kylin/job/spark/SparkHelloWorld.java | 29 +++++++++
 .../apache/kylin/job/spark/SparkHiveDemo.java   | 34 ++++++++++
 .../job/impl/threadpool/BaseSchedulerTest.java  |  9 +++
 .../impl/threadpool/ITDefaultSchedulerTest.java | 17 +++++
 pom.xml                                         | 24 ++++++-
 11 files changed, 351 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fff9b188/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index ef6b7d1..89a7fcc 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -130,6 +130,14 @@ public class KylinConfig {
     public static final String KYLIN_HOME = "KYLIN_HOME";
     public static final String KYLIN_CONF = "KYLIN_CONF";
 
+    public static final String HBASE_REGION_CUT_SMALL = "kylin.job.hbase.region.cut.small";
+    public static final String HBASE_REGION_CUT_MEDIUM = "kylin.job.hbase.region.cut.medium";
+    public static final String HBASE_REGION_CUT_LARGE = "kylin.job.hbase.region.cut.large";
+
+
+    public static final String SPARK_HOME = "kylin.spark.home";
+    public static final String SPARK_MASTER = "kylin.spark.master";
+
     private static final Logger logger = LoggerFactory.getLogger(KylinConfig.class);
 
     public static final String VERSION = "${project.version}";
@@ -433,30 +441,11 @@ public class KylinConfig {
     public int getDictionaryMaxCardinality() {
         return Integer.parseInt(getOptional("kylin.dictionary.max.cardinality", "5000000"));
     }
-    
+
     public int getTableSnapshotMaxMB() {
         return Integer.parseInt(getOptional("kylin.table.snapshot.max_mb", "300"));
     }
-    
-    public int getHBaseRegionCut(String capacity) {
-        String cut;
-        switch (capacity) {
-        case "SMALL":
-            cut = getProperty("kylin.job.hbase.region.cut.small", "5");
-            break;
-        case "MEDIUM":
-            cut = getProperty("kylin.job.hbase.region.cut.medium", "10");
-            break;
-        case "LARGE":
-            cut = getProperty("kylin.job.hbase.region.cut.large", "50");
-            break;
-        default:
-            throw new IllegalArgumentException("Capacity not recognized: " + capacity);
-        }
 
-        return Integer.valueOf(cut);
-    }
-    
     public int getHBaseRegionCutMin() {
         return Integer.parseInt(getOptional("kylin.job.hbase.region.cut.min", "2"));
     }
@@ -464,11 +453,11 @@ public class KylinConfig {
     public int getHBaseRegionCutMax() {
         return Integer.parseInt(getOptional("kylin.job.hbase.region.cut.max", "1000"));
     }
-    
+
     public int getScanThreshold() {
         return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
     }
-    
+
     public boolean getQueryRunLocalCoprocessor() {
         return Boolean.parseBoolean(getOptional("kylin.query.run.local.coprocessor", "false"));
     }
@@ -710,6 +699,32 @@ public class KylinConfig {
         }
     }
 
+    public String getSparkHome() {
+        return kylinConfig.getString(SPARK_HOME);
+    }
+    public String getSparkMaster() {
+        return kylinConfig.getString(SPARK_MASTER);
+    }
+
+    public int getHBaseRegionCut(String capacity) {
+        String cut;
+        switch (capacity) {
+        case "SMALL":
+            cut = getProperty(HBASE_REGION_CUT_SMALL, "5");
+            break;
+        case "MEDIUM":
+            cut = getProperty(HBASE_REGION_CUT_MEDIUM, "10");
+            break;
+        case "LARGE":
+            cut = getProperty(HBASE_REGION_CUT_LARGE, "50");
+            break;
+        default:
+            throw new IllegalArgumentException("Capacity not recognized: " + capacity);
+        }
+
+        return Integer.valueOf(cut);
+    }
+
     public String toString() {
         return getMetadataUrl();
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fff9b188/job/pom.xml
----------------------------------------------------------------------
diff --git a/job/pom.xml b/job/pom.xml
index 3819265..41894fe 100644
--- a/job/pom.xml
+++ b/job/pom.xml
@@ -71,6 +71,24 @@
             <version>${project.parent.version}</version>
         </dependency>
 
+        <dependency> <!-- Spark dependency -->
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.10</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency> <!-- Spark dependency -->
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.10</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency> <!-- Spark dependency -->
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_2.10</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fff9b188/job/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java b/job/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
new file mode 100644
index 0000000..dd884d5
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/spark/AbstractSparkApplication.java
@@ -0,0 +1,31 @@
+package org.apache.kylin.job.spark;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.job.tools.OptionsHelper;
+
+import java.io.Serializable;
+
+/**
+ */
+public abstract class AbstractSparkApplication implements Serializable {
+
+    protected abstract Options getOptions();
+
+    protected abstract void execute(OptionsHelper optionsHelper) throws Exception;
+
+    public final void execute(String[] args) {
+        OptionsHelper optionsHelper = new OptionsHelper();
+        System.out.println("Spark Application args:" + StringUtils.join(args, " "));
+        try {
+            optionsHelper.parseOptions(getOptions(), args);
+            execute(optionsHelper);
+        } catch (ParseException e) {
+            optionsHelper.printUsage("SparkExecutor", getOptions());
+            throw new RuntimeException("error parsing args", e);
+        } catch (Exception e) {
+            throw new RuntimeException("error execute Spark Application", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fff9b188/job/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java b/job/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
new file mode 100644
index 0000000..c648324
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/spark/SparkCountDemo.java
@@ -0,0 +1,66 @@
+package org.apache.kylin.job.spark;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.kylin.job.tools.OptionsHelper;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import scala.Tuple2;
+
+/**
+ */
+public class SparkCountDemo extends AbstractSparkApplication {
+
+    private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
+
+    private Options options;
+
+
+    public SparkCountDemo() {
+        options = new Options();
+//        options.addOption(OPTION_INPUT_PATH);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String logFile = "hdfs://10.249.65.231:8020/tmp/kylin.properties"; // Should be some file on your system
+        SparkConf conf = new SparkConf().setAppName("Simple Application");
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        final JavaPairRDD<String, Integer> logData = sc.textFile(logFile).mapToPair(new PairFunction<String, String, Integer>() {
+
+            @Override
+            public Tuple2<String, Integer> call(String s) throws Exception {
+                return new Tuple2<String, Integer>(s, s.length());
+            }
+        }).sortByKey();
+        logData.persist(StorageLevel.MEMORY_AND_DISK_SER());
+
+        System.out.println("line number:" + logData.count());
+
+        logData.mapToPair(new PairFunction<Tuple2<String, Integer>, ImmutableBytesWritable, KeyValue>() {
+            @Override
+            public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
+                ImmutableBytesWritable key = new ImmutableBytesWritable(stringIntegerTuple2._1().getBytes());
+                KeyValue value = new KeyValue(stringIntegerTuple2._1().getBytes(), "f".getBytes(), "c".getBytes(), String.valueOf(stringIntegerTuple2._2()).getBytes());
+                return new Tuple2(key, value);
+            }
+        }).saveAsNewAPIHadoopFile("hdfs://10.249.65.231:8020/tmp/hfile",
+                ImmutableBytesWritable.class,
+                KeyValue.class,
+                HFileOutputFormat.class
+        );
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fff9b188/job/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkEntry.java b/job/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
new file mode 100644
index 0000000..451da14
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/spark/SparkEntry.java
@@ -0,0 +1,24 @@
+package org.apache.kylin.job.spark;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ */
+public final class SparkEntry {
+
+    public static void main(String[] args) throws Exception {
+        System.out.println("SparkEntry args:" + StringUtils.join(args, " "));
+        Preconditions.checkArgument(args.length >= 2, "-className is required");
+        Preconditions.checkArgument(args[0].equals("-className"), "-className is required");
+        final String className = args[1];
+        final Object o = Class.<AbstractSparkApplication>forName(className).newInstance();
+        Preconditions.checkArgument(o instanceof AbstractSparkApplication, className + " is not a subClass of AbstractSparkApplication");
+        String[] appArgs = new String[args.length - 2];
+        for (int i = 2; i < args.length; i++) {
+            appArgs[i - 2] = args[i];
+        }
+        AbstractSparkApplication abstractSparkApplication = (AbstractSparkApplication) o;
+        abstractSparkApplication.execute(appArgs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fff9b188/job/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java b/job/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
new file mode 100644
index 0000000..68d2a8c
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/spark/SparkExecutable.java
@@ -0,0 +1,63 @@
+package org.apache.kylin.job.spark;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Logger;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ */
+public class SparkExecutable extends AbstractExecutable {
+
+    private static final String CLASS_NAME = "className";
+
+    public void setClassName(String className) {
+        this.setParam(CLASS_NAME, className);
+    }
+
+    private String formatArgs() {
+        StringBuilder stringBuilder = new StringBuilder();
+        for (Map.Entry<String, String> entry : getParams().entrySet()) {
+            stringBuilder.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" ");
+        }
+        if (stringBuilder.length() > 0) {
+            return stringBuilder.substring(0, stringBuilder.length() - 1).toString();
+        } else {
+            return StringUtils.EMPTY;
+        }
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final KylinConfig config = context.getConfig();
+        Preconditions.checkNotNull(config.getSparkHome());
+        Preconditions.checkNotNull(config.getSparkMaster());
+        try {
+            String cmd = String.format("%s/bin/spark-submit --class \"org.apache.kylin.job.spark.SparkEntry\" --master %s %s %s",
+                    config.getSparkHome(),
+                    config.getSparkMaster(),
+                    config.getKylinJobJarPath(),
+                    formatArgs());
+            logger.info("cmd:" + cmd);
+            final StringBuilder output = new StringBuilder();
+            config.getCliCommandExecutor().execute(cmd, new Logger() {
+                @Override
+                public void log(String message) {
+                    output.append(message);
+                    output.append("\n");
+                }
+            });
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+        } catch (IOException e) {
+            logger.error("error run spark job:", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fff9b188/job/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java b/job/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
new file mode 100644
index 0000000..3cd919c
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/spark/SparkHelloWorld.java
@@ -0,0 +1,29 @@
+package org.apache.kylin.job.spark;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.job.tools.OptionsHelper;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import java.io.IOException;
+
+/**
+ */
+public class SparkHelloWorld extends AbstractSparkApplication {
+
+    @Override
+    protected Options getOptions() {
+        return new Options();
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        System.out.println("hello kylin-spark");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fff9b188/job/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java b/job/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
new file mode 100644
index 0000000..48a1449
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/spark/SparkHiveDemo.java
@@ -0,0 +1,34 @@
+package org.apache.kylin.job.spark;
+
+import org.apache.commons.cli.Options;
+import org.apache.kylin.job.tools.OptionsHelper;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.hive.HiveContext;
+
+/**
+ */
+public class SparkHiveDemo extends AbstractSparkApplication {
+
+    private final Options options;
+
+
+    public SparkHiveDemo() {
+        options = new Options();
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        SparkConf conf = new SparkConf().setAppName("Simple Application");
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        HiveContext sqlContext = new HiveContext(sc.sc());
+        final DataFrame dataFrame = sqlContext.sql("select * from test_kylin_fact");
+        System.out.println("count * of the table:" + dataFrame.count());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fff9b188/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 913f707..a0362d5 100644
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -18,9 +18,11 @@
 
 package org.apache.kylin.job.impl.threadpool;
 
+import akka.actor.Deploy;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.ZookeeperJobLock;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.job.DeployUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -28,6 +30,7 @@ import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.manager.ExecutableManager;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
@@ -81,6 +84,12 @@ public abstract class BaseSchedulerTest extends HBaseMetadataTestCase {
         }
     }
 
+    @BeforeClass
+    public static void beforeClass() {
+        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox");
+        DeployUtil.overrideJobJarLocations();
+    }
+
     @Before
     public void setup() throws Exception {
         createTestMetadata();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fff9b188/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
index bf4a1dc..89f82fa 100644
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
@@ -21,9 +21,13 @@ package org.apache.kylin.job.impl.threadpool;
 import org.apache.kylin.job.*;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.job.spark.SparkExecutable;
+import org.apache.kylin.job.spark.SparkHelloWorld;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  */
@@ -94,4 +98,17 @@ public class ITDefaultSchedulerTest extends BaseSchedulerTest {
         Thread.sleep(5000);
         System.out.println(job);
     }
+
+    @Test
+    public void testSparkExecutable() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        SparkExecutable task = new SparkExecutable();
+        task.setClassName(SparkHelloWorld.class.getName());
+        job.addTask(task);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        assertEquals(ExecutableState.SUCCEED, job.getStatus());
+        final Output output = jobService.getOutput(task.getId());
+        assertTrue(output.getVerboseMsg().contains("hello kylin-spark"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fff9b188/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 35dace6..1a23d39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,6 +69,9 @@
         <commons-httpclient.version>3.1</commons-httpclient.version>
         <commons-collections4.version>4.0</commons-collections4.version>
 
+        <!-- Spark -->
+        <spark.version>1.4.0</spark.version>
+
 
         <!-- Utility -->
         <log4j.version>1.2.17</log4j.version>
@@ -259,6 +262,25 @@
                 <version>${calcite.version}</version>
             </dependency>
 
+            <!-- Spark dependency -->
+            <dependency>
+                <groupId>org.apache.spark</groupId>
+                <artifactId>spark-core_2.10</artifactId>
+                <version>${spark.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.spark</groupId>
+                <artifactId>spark-sql_2.10</artifactId>
+                <version>${spark.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.spark</groupId>
+                <artifactId>spark-hive_2.10</artifactId>
+                <version>${spark.version}</version>
+            </dependency>
+
             <!-- Other dependencies -->
             <dependency>
                 <groupId>junit</groupId>
@@ -466,7 +488,7 @@
                     <configuration>
                         <packagingExcludes>
                             WEB-INF/lib/servlet-api-*.jar,
-                            WEB-INF/lib/zookeeper-*.jar,
+                            WEB-INF/lib/zookeeper-*.jar
                         </packagingExcludes>
                     </configuration>
                 </plugin>