You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/11 06:41:17 UTC

[1/2] incubator-carbondata git commit: fix conf issue for scanrdd

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 2a6d097d1 -> 5c476ec40


fix conf issue for scanrdd

new version

use org.apache.carbondata.hadoop.readsupport.impl.RawDataReadSupport in spark1

compile issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/656f3ee2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/656f3ee2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/656f3ee2

Branch: refs/heads/master
Commit: 656f3ee2f56c31abdb5c241fa56cf86438489771
Parents: 2a6d097
Author: wangfei <wa...@126.com>
Authored: Sun Dec 11 07:24:03 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sun Dec 11 14:40:48 2016 +0800

----------------------------------------------------------------------
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  2 +-
 .../carbondata/spark/rdd/SparkCommonEnv.scala   | 30 --------------------
 .../carbondata/spark/rdd/SparkReadSupport.scala | 28 ++++++++++++++++++
 .../spark/sql/hive/DistributionUtil.scala       |  4 ---
 .../scala/org/apache/spark/sql/CarbonEnv.scala  | 14 +++------
 .../sql/CarbonDatasourceHadoopRelation.scala    |  4 ---
 .../scala/org/apache/spark/sql/CarbonEnv.scala  | 15 +++-------
 7 files changed, 37 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/656f3ee2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index a750b10..d654067 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -197,7 +197,7 @@ class CarbonScanRDD[V: ClassTag](
   }
 
   private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[V] = {
-    CarbonInputFormat.setCarbonReadSupport(conf, SparkCommonEnv.readSupportClass)
+    CarbonInputFormat.setCarbonReadSupport(conf, SparkReadSupport.readSupportClass)
     createInputFormat(conf)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/656f3ee2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkCommonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkCommonEnv.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkCommonEnv.scala
deleted file mode 100644
index bf614b1..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkCommonEnv.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
-
-// Used to solve cyclic-dependency issue of carbon-spark-common and carbon-spark, carbon-spark2
-// modules, variables or functions that different in carbon-spark and carbon-spark2 are set here
-object SparkCommonEnv {
-
-  var readSupportClass: Class[_ <: CarbonReadSupport[_]] = _
-
-  var numExistingExecutors: Int = _
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/656f3ee2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkReadSupport.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkReadSupport.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkReadSupport.scala
new file mode 100644
index 0000000..3d78f0e
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkReadSupport.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+
+// Used to solve cyclic-dependency issue of carbon-spark-common and carbon-spark, carbon-spark2
+// modules, variables or functions that different in carbon-spark and carbon-spark2 are set here
+object SparkReadSupport {
+
+  var readSupportClass: Class[_ <: CarbonReadSupport[_]] = _
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/656f3ee2/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 5b9353e..8950862 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -29,7 +29,6 @@ import org.apache.carbondata.core.carbon.datastore.block.Distributable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.rdd.SparkCommonEnv
 
 object DistributionUtil {
   @transient
@@ -231,9 +230,6 @@ object DistributionUtil {
       hostToLocalTaskCount: Map[String, Int] = Map.empty): Boolean = {
     sc.schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        LOGGER.info(
-            s"number of required executors are = $requiredExecutors and existing executors are = " +
-            s"${SparkCommonEnv.numExistingExecutors}")
         if (requiredExecutors > 0) {
           LOGGER.info(s"Requesting total executors: $requiredExecutors")
           b.requestTotalExecutors(requiredExecutors, localityAwareTasks, hostToLocalTaskCount)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/656f3ee2/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index efa588d..ec6f456 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -21,7 +21,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.sql.hive.CarbonMetastore
 
 import org.apache.carbondata.hadoop.readsupport.impl.RawDataReadSupport
-import org.apache.carbondata.spark.rdd.SparkCommonEnv
+import org.apache.carbondata.spark.rdd.SparkReadSupport
 
 case class CarbonEnv(carbonMetastore: CarbonMetastore)
 
@@ -29,6 +29,9 @@ object CarbonEnv {
 
   @volatile private var carbonEnv: CarbonEnv = _
 
+  // set readsupport class global so that the executor can get it.
+  SparkReadSupport.readSupportClass = classOf[RawDataReadSupport]
+
   var initialized = false
 
   def init(sqlContext: SQLContext): Unit = {
@@ -36,7 +39,6 @@ object CarbonEnv {
       val cc = sqlContext.asInstanceOf[CarbonContext]
       val catalog = new CarbonMetastore(cc, cc.storePath, cc.hiveClientInterface, "")
       carbonEnv = CarbonEnv(catalog)
-      setSparkCommonEnv(sqlContext)
       initialized = true
     }
   }
@@ -45,14 +47,6 @@ object CarbonEnv {
     if (initialized) carbonEnv
     else throw new RuntimeException("CarbonEnv not initialized")
   }
-
-  private def setSparkCommonEnv(sqlContext: SQLContext): Unit = {
-    SparkCommonEnv.readSupportClass = classOf[RawDataReadSupport]
-    SparkCommonEnv.numExistingExecutors = sqlContext.sparkContext.schedulerBackend match {
-      case b: CoarseGrainedSchedulerBackend => b.numExistingExecutors
-      case _ => 0
-    }
-  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/656f3ee2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 1ecc67a..997106c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -62,16 +62,12 @@ case class CarbonDatasourceHadoopRelation(
   override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)
 
   def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
-    val job = new Job(new JobConf())
-    val conf = new Configuration(job.getConfiguration)
     val filterExpression: Option[Expression] = filters.flatMap { filter =>
       CarbonFilters.createCarbonFilter(schema, filter)
     }.reduceOption(new AndExpression(_, _))
 
     val projection = new CarbonProjection
     requiredColumns.foreach(projection.addColumn)
-    CarbonInputFormat.setColumnProjection(conf, projection)
-    CarbonInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
 
     new CarbonScanRDD[Row](sqlContext.sparkContext, projection, filterExpression.orNull,
       absIdentifier, carbonTable)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/656f3ee2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index dcff80c..1fa710e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,13 +17,12 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.sql.hive.CarbonMetastore
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.rdd.SparkCommonEnv
+import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
 /**
@@ -37,6 +36,9 @@ object CarbonEnv {
 
   @volatile private var carbonEnv: CarbonEnv = _
 
+  // set readsupport class global so that the executor can get it.
+  SparkReadSupport.readSupportClass = classOf[SparkRowReadSupportImpl]
+
   var initialized = false
 
   def init(sqlContext: SQLContext): Unit = {
@@ -48,7 +50,6 @@ object CarbonEnv {
         new CarbonMetastore(sqlContext.sparkSession.conf, storePath)
       }
       carbonEnv = CarbonEnv(catalog)
-      setSparkCommonEnv(sqlContext)
       initialized = true
     }
   }
@@ -56,14 +57,6 @@ object CarbonEnv {
   def get: CarbonEnv = {
     carbonEnv
   }
-
-  private def setSparkCommonEnv(sqlContext: SQLContext): Unit = {
-    SparkCommonEnv.readSupportClass = classOf[SparkRowReadSupportImpl]
-    SparkCommonEnv.numExistingExecutors = sqlContext.sparkContext.schedulerBackend match {
-      case b: CoarseGrainedSchedulerBackend => b.getExecutorIds().length
-      case _ => 0
-    }
-  }
 }
 
 


[2/2] incubator-carbondata git commit: [CARBONDATA-520] Executor can not get the read support class This closes #417

Posted by ja...@apache.org.
[CARBONDATA-520] Executor can not get the read support class This closes #417


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5c476ec4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5c476ec4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5c476ec4

Branch: refs/heads/master
Commit: 5c476ec40c1c1e610a493e547cd67b3502a04202
Parents: 2a6d097 656f3ee
Author: jackylk <ja...@huawei.com>
Authored: Sun Dec 11 14:41:05 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sun Dec 11 14:41:05 2016 +0800

----------------------------------------------------------------------
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  2 +-
 .../carbondata/spark/rdd/SparkCommonEnv.scala   | 30 --------------------
 .../carbondata/spark/rdd/SparkReadSupport.scala | 28 ++++++++++++++++++
 .../spark/sql/hive/DistributionUtil.scala       |  4 ---
 .../scala/org/apache/spark/sql/CarbonEnv.scala  | 14 +++------
 .../sql/CarbonDatasourceHadoopRelation.scala    |  4 ---
 .../scala/org/apache/spark/sql/CarbonEnv.scala  | 15 +++-------
 7 files changed, 37 insertions(+), 60 deletions(-)
----------------------------------------------------------------------