You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/28 13:22:01 UTC
[1/2] incubator-carbondata git commit: Added thrift server support to
Spark 2.0 integration
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 997920a0f -> 65b922126
Added thrift server support to Spark 2.0 integration
Fixed comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/81eca096
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/81eca096
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/81eca096
Branch: refs/heads/master
Commit: 81eca0967cc2a836f81278ff552057a1fc59a531
Parents: 997920a
Author: ravipesala <ra...@gmail.com>
Authored: Wed Dec 28 09:56:48 2016 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Wed Dec 28 21:12:03 2016 +0800
----------------------------------------------------------------------
.../spark/thriftserver/CarbonThriftServer.scala | 64 ++++++++++++++++++++
.../org/apache/spark/sql/CarbonSession.scala | 46 +++++++++++++-
.../execution/command/carbonTableSchema.scala | 4 +-
3 files changed, 110 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/81eca096/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
new file mode 100644
index 0000000..6a6ee00
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.thriftserver
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.util.CarbonProperties
+
+object CarbonThriftServer {
+
+ def main(args: Array[String]): Unit = {
+
+ import org.apache.spark.sql.CarbonSession._
+
+ val sparkConf = new SparkConf(loadDefaults = true)
+ val builder = SparkSession
+ .builder()
+ .config(sparkConf)
+ .appName("Carbon Thrift Server(uses CarbonSession)")
+ .enableHiveSupport()
+
+ val sparkHome = System.getenv.get("SPARK_HOME")
+ if (null != sparkHome) {
+ builder.config("carbon.properties.filepath",
+ sparkHome + '/' + "conf" + '/' + "carbon.properties")
+ System.setProperty("carbon.properties.filepath",
+ sparkHome + '/' + "conf" + '/' + "carbon.properties")
+ }
+ CarbonProperties.getInstance().addProperty("carbon.storelocation", args.head)
+
+ val spark = builder.getOrCreateCarbonSession()
+ val warmUpTime = CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000")
+ try {
+ Thread.sleep(Integer.parseInt(warmUpTime))
+ } catch {
+ case e: Exception =>
+ val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ LOG.error(s"Wrong value for carbon.spark.warmUpTime $warmUpTime " +
+ "Using default Value and proceeding")
+ Thread.sleep(5000)
+ }
+
+ HiveThriftServer2.startWithContext(spark.sqlContext)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/81eca096/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 67ee478..e654c0e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -16,24 +16,64 @@
*/
package org.apache.spark.sql
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.SparkSession.Builder
import org.apache.spark.sql.hive.CarbonSessionState
-import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.util.Utils
/**
* Session implementation for {org.apache.spark.sql.SparkSession}
* Implemented this class only to use our own SQL DDL commands.
* User needs to use {CarbonSession.getOrCreateCarbon} to create Carbon session.
- * @param sc
*/
-class CarbonSession(sc: SparkContext) extends SparkSession(sc) {
+class CarbonSession(@transient val sc: SparkContext,
+ @transient private val existingSharedState: Option[SharedState]) extends SparkSession(sc) {
+
+ def this(sc: SparkContext) {
+ this(sc, None)
+ }
CarbonEnv.init(this)
@transient
override private[sql] lazy val sessionState: SessionState = new CarbonSessionState(this)
+
+ /**
+ * State shared across sessions, including the [[SparkContext]], cached data, listener,
+ * and a catalog that interacts with external systems.
+ */
+ @transient
+ override private[sql] lazy val sharedState: SharedState = {
+ existingSharedState.getOrElse(reflect[SharedState, SparkContext](
+ "org.apache.spark.sql.hive.HiveSharedState",
+ sparkContext))
+ }
+
+ override def newSession(): SparkSession = {
+ new CarbonSession(sparkContext, Some(sharedState))
+ }
+
+ /**
+ * Helper method to create an instance of [[T]] using a single-arg constructor that
+ * accepts an [[Arg]].
+ */
+ private def reflect[T, Arg <: AnyRef](
+ className: String,
+ ctorArg: Arg)(implicit ctorArgTag: ClassTag[Arg]): T = {
+ try {
+ val clazz = Utils.classForName(className)
+ val ctor = clazz.getDeclaredConstructor(ctorArgTag.runtimeClass)
+ ctor.newInstance(ctorArg).asInstanceOf[T]
+ } catch {
+ case NonFatal(e) =>
+ throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
+ }
+ }
}
object CarbonSession {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/81eca096/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 86bd92b..a851d47 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -375,7 +375,6 @@ case class LoadTable(
val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
- val kettleHomePath = CarbonScalaUtil.getKettleHome(sparkSession.sqlContext)
// TODO It will be removed after kettle is removed.
val useKettle = options.get("use_kettle") match {
@@ -389,6 +388,9 @@ case class LoadTable(
}
}
+ val kettleHomePath =
+ if (useKettle) CarbonScalaUtil.getKettleHome(sparkSession.sqlContext) else ""
+
val delimiter = options.getOrElse("delimiter", ",")
val quoteChar = options.getOrElse("quotechar", "\"")
val fileHeader = options.getOrElse("fileheader", "")
[2/2] incubator-carbondata git commit: [CARBONDATA-574] Added thrift
server support to Spark 2.0 integration This closes #474
Posted by ja...@apache.org.
[CARBONDATA-574] Added thrift server support to Spark 2.0 integration This closes #474
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/65b92212
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/65b92212
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/65b92212
Branch: refs/heads/master
Commit: 65b922126713db00ab28b2006d7e72b498364cf9
Parents: 997920a 81eca09
Author: jackylk <ja...@huawei.com>
Authored: Wed Dec 28 21:21:48 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Wed Dec 28 21:21:48 2016 +0800
----------------------------------------------------------------------
.../spark/thriftserver/CarbonThriftServer.scala | 64 ++++++++++++++++++++
.../org/apache/spark/sql/CarbonSession.scala | 46 +++++++++++++-
.../execution/command/carbonTableSchema.scala | 4 +-
3 files changed, 110 insertions(+), 4 deletions(-)
----------------------------------------------------------------------