You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/28 13:22:01 UTC

[1/2] incubator-carbondata git commit: Added thrift server support to Spark 2.0 integration

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 997920a0f -> 65b922126


Added thrift server support to Spark 2.0 integration

Fixed comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/81eca096
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/81eca096
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/81eca096

Branch: refs/heads/master
Commit: 81eca0967cc2a836f81278ff552057a1fc59a531
Parents: 997920a
Author: ravipesala <ra...@gmail.com>
Authored: Wed Dec 28 09:56:48 2016 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Wed Dec 28 21:12:03 2016 +0800

----------------------------------------------------------------------
 .../spark/thriftserver/CarbonThriftServer.scala | 64 ++++++++++++++++++++
 .../org/apache/spark/sql/CarbonSession.scala    | 46 +++++++++++++-
 .../execution/command/carbonTableSchema.scala   |  4 +-
 3 files changed, 110 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/81eca096/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
new file mode 100644
index 0000000..6a6ee00
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.thriftserver
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.util.CarbonProperties
+
+object CarbonThriftServer {
+
+  def main(args: Array[String]): Unit = {
+
+    import org.apache.spark.sql.CarbonSession._
+
+    val sparkConf = new SparkConf(loadDefaults = true)
+    val builder = SparkSession
+      .builder()
+      .config(sparkConf)
+      .appName("Carbon Thrift Server(uses CarbonSession)")
+      .enableHiveSupport()
+
+    val sparkHome = System.getenv.get("SPARK_HOME")
+    if (null != sparkHome) {
+      builder.config("carbon.properties.filepath",
+        sparkHome + '/' + "conf" + '/' + "carbon.properties")
+      System.setProperty("carbon.properties.filepath",
+        sparkHome + '/' + "conf" + '/' + "carbon.properties")
+    }
+    CarbonProperties.getInstance().addProperty("carbon.storelocation", args.head)
+
+    val spark = builder.getOrCreateCarbonSession()
+    val warmUpTime = CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000")
+    try {
+      Thread.sleep(Integer.parseInt(warmUpTime))
+    } catch {
+      case e: Exception =>
+        val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+        LOG.error(s"Wrong value for carbon.spark.warmUpTime $warmUpTime " +
+                  "Using default Value and proceeding")
+        Thread.sleep(5000)
+    }
+
+    HiveThriftServer2.startWithContext(spark.sqlContext)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/81eca096/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 67ee478..e654c0e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -16,24 +16,64 @@
  */
 package org.apache.spark.sql
 
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession.Builder
 import org.apache.spark.sql.hive.CarbonSessionState
-import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.util.Utils
 
 /**
  * Session implementation for {org.apache.spark.sql.SparkSession}
  * Implemented this class only to use our own SQL DDL commands.
  * User needs to use {CarbonSession.getOrCreateCarbon} to create Carbon session.
- * @param sc
  */
-class CarbonSession(sc: SparkContext) extends SparkSession(sc) {
+class CarbonSession(@transient val sc: SparkContext,
+    @transient private val existingSharedState: Option[SharedState]) extends SparkSession(sc) {
+
+  def this(sc: SparkContext) {
+    this(sc, None)
+  }
 
   CarbonEnv.init(this)
 
   @transient
   override private[sql] lazy val sessionState: SessionState = new CarbonSessionState(this)
+
+  /**
+   * State shared across sessions, including the [[SparkContext]], cached data, listener,
+   * and a catalog that interacts with external systems.
+   */
+  @transient
+  override private[sql] lazy val sharedState: SharedState = {
+    existingSharedState.getOrElse(reflect[SharedState, SparkContext](
+      "org.apache.spark.sql.hive.HiveSharedState",
+      sparkContext))
+  }
+
+  override def newSession(): SparkSession = {
+    new CarbonSession(sparkContext, Some(sharedState))
+  }
+
+  /**
+   * Helper method to create an instance of [[T]] using a single-arg constructor that
+   * accepts an [[Arg]].
+   */
+  private def reflect[T, Arg <: AnyRef](
+      className: String,
+      ctorArg: Arg)(implicit ctorArgTag: ClassTag[Arg]): T = {
+    try {
+      val clazz = Utils.classForName(className)
+      val ctor = clazz.getDeclaredConstructor(ctorArgTag.runtimeClass)
+      ctor.newInstance(ctorArg).asInstanceOf[T]
+    } catch {
+      case NonFatal(e) =>
+        throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
+    }
+  }
 }
 
 object CarbonSession {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/81eca096/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 86bd92b..a851d47 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -375,7 +375,6 @@ case class LoadTable(
 
 
       val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
-      val kettleHomePath = CarbonScalaUtil.getKettleHome(sparkSession.sqlContext)
 
       // TODO It will be removed after kettle is removed.
       val useKettle = options.get("use_kettle") match {
@@ -389,6 +388,9 @@ case class LoadTable(
           }
       }
 
+      val kettleHomePath =
+        if (useKettle) CarbonScalaUtil.getKettleHome(sparkSession.sqlContext) else ""
+
       val delimiter = options.getOrElse("delimiter", ",")
       val quoteChar = options.getOrElse("quotechar", "\"")
       val fileHeader = options.getOrElse("fileheader", "")


[2/2] incubator-carbondata git commit: [CARBONDATA-574] Added thrift server support to Spark 2.0 integration This closes #474

Posted by ja...@apache.org.
[CARBONDATA-574] Added thrift server support to Spark 2.0 integration This closes #474


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/65b92212
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/65b92212
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/65b92212

Branch: refs/heads/master
Commit: 65b922126713db00ab28b2006d7e72b498364cf9
Parents: 997920a 81eca09
Author: jackylk <ja...@huawei.com>
Authored: Wed Dec 28 21:21:48 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Wed Dec 28 21:21:48 2016 +0800

----------------------------------------------------------------------
 .../spark/thriftserver/CarbonThriftServer.scala | 64 ++++++++++++++++++++
 .../org/apache/spark/sql/CarbonSession.scala    | 46 +++++++++++++-
 .../execution/command/carbonTableSchema.scala   |  4 +-
 3 files changed, 110 insertions(+), 4 deletions(-)
----------------------------------------------------------------------