You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/11 12:02:49 UTC
[08/11] carbondata git commit: [CARBONDATA-1701][SEGMENT READING]
Threadsafe api revealed for set segment to read
[CARBONDATA-1701][SEGMENT READING] Threadsafe api revealed for set segment to read
Example: CarbonSession.threadSet(carbon.input.segments.default.carbon_table_MulTI_THread, 1,2,3)
This closes #1482
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/80195da4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/80195da4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/80195da4
Branch: refs/heads/pre-aggregate
Commit: 80195da41390cd122e6099483149aa4cf59300fd
Parents: 520e50f
Author: rahulforallp <ra...@knoldus.in>
Authored: Fri Nov 10 18:25:09 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Sat Nov 11 15:06:01 2017 +0530
----------------------------------------------------------------------
.../carbondata/core/util/CarbonSessionInfo.java | 38 +++++++++-
.../carbondata/core/util/SessionParams.java | 22 ++++++
.../sql/CarbonDatasourceHadoopRelation.scala | 5 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 7 +-
.../org/apache/spark/sql/CarbonSession.scala | 31 +++++++-
.../execution/command/CarbonHiveCommands.scala | 37 ++++++----
.../spark/sql/parser/CarbonSparkSqlParser.scala | 6 +-
.../TestSegmentReadingForMultiThreading.scala | 78 ++++++++++++++++++++
8 files changed, 198 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java
index fb778bc..a44bde7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java
@@ -18,15 +18,19 @@
package org.apache.carbondata.core.util;
import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
/**
* This class maintains carbon session information details
*/
-public class CarbonSessionInfo implements Serializable {
+public class CarbonSessionInfo implements Serializable, Cloneable {
private static final long serialVersionUID = 7738818814501121256L;
// contains carbon session param details
private SessionParams sessionParams;
+ private SessionParams threadParams;
public SessionParams getSessionParams() {
return sessionParams;
@@ -36,4 +40,36 @@ public class CarbonSessionInfo implements Serializable {
this.sessionParams = sessionParams;
}
+ public SessionParams getThreadParams() {
+ return threadParams;
+ }
+
+ public void setThreadParams(SessionParams threadParams) {
+ this.threadParams = threadParams;
+ }
+
+ public CarbonSessionInfo() {
+ this.sessionParams = new SessionParams();
+ this.threadParams = new SessionParams();
+ }
+
+ public CarbonSessionInfo clone() throws CloneNotSupportedException {
+ super.clone();
+ CarbonSessionInfo newObj = new CarbonSessionInfo();
+ for (Map.Entry<String, String> entry : sessionParams.getAll().entrySet()) {
+ try {
+ newObj.getSessionParams().addProperty(entry.getKey(), entry.getValue(), false);
+ } catch (InvalidConfigurationException ex) {
+ ex.printStackTrace();
+ }
+ }
+ for (Map.Entry<String, String> entry : threadParams.getAll().entrySet()) {
+ try {
+ newObj.getThreadParams().addProperty(entry.getKey(), entry.getValue(), false);
+ } catch (InvalidConfigurationException ex) {
+ ex.printStackTrace();
+ }
+ }
+ return newObj;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 1878416..1a91272 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -81,6 +81,28 @@ public class SessionParams implements Serializable {
return this;
}
+ /**
+ * This method will be used to add a new property
+ *
+ * @param key
+ * @return properties value
+ */
+ public SessionParams addProperty(String key, String value, Boolean doAuditing)
+ throws InvalidConfigurationException {
+ boolean isValidConf = validateKeyValue(key, value);
+ if (isValidConf) {
+ if (doAuditing) {
+ LOGGER.audit("The key " + key + " with value " + value + " added in the session param");
+ }
+ sProps.put(key, value);
+ }
+ return this;
+ }
+
+ public Map<String, String> getAll() {
+ return sProps;
+ }
+
public SessionParams addProps(Map<String, String> addedProps) {
this.addedProps.putAll(addedProps);
return this;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 10336eb..3fb65be 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.expression.Expression
import org.apache.carbondata.core.scan.expression.logical.AndExpression
-import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
import org.apache.carbondata.hadoop.CarbonProjection
import org.apache.carbondata.spark.rdd.CarbonScanRDD
import org.apache.carbondata.streaming.StreamSinkFactory
@@ -51,9 +50,7 @@ case class CarbonDatasourceHadoopRelation(
lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
lazy val databaseName: String = carbonTable.getDatabaseName
lazy val tableName: String = carbonTable.getFactTableName
- lazy val carbonSessionInfo : CarbonSessionInfo =
- CarbonEnv.getInstance(sparkSession).carbonSessionInfo
- ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+ CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
@transient lazy val carbonRelation: CarbonRelation =
CarbonEnv.getInstance(sparkSession).carbonMetastore.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index b324b10..a37b55b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -53,8 +53,13 @@ class CarbonEnv {
def init(sparkSession: SparkSession): Unit = {
sparkSession.udf.register("getTupleId", () => "")
if (!initialized) {
+ // update carbon session parameters , preserve thread parameters
+ val currentThreadSesssionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
carbonSessionInfo = new CarbonSessionInfo()
- sessionParams = new SessionParams()
+ sessionParams = carbonSessionInfo.getSessionParams
+ if (currentThreadSesssionInfo != null) {
+ carbonSessionInfo.setThreadParams(currentThreadSesssionInfo.getThreadParams)
+ }
carbonSessionInfo.setSessionParams(sessionParams)
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
val config = new CarbonSQLConf(sparkSession)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 6020eee..33e34bb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -18,17 +18,20 @@ package org.apache.spark.sql
import java.io.File
+import scala.collection.JavaConverters._
+
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.SparkSession.Builder
import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener
import org.apache.spark.sql.hive.CarbonSessionState
+import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
import org.apache.spark.sql.internal.{SessionState, SharedState}
import org.apache.spark.util.Utils
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo}
import org.apache.carbondata.spark.util.CommonUtil
/**
@@ -189,4 +192,30 @@ object CarbonSession {
}
}
+ def threadSet(key: String, value: String): Unit = {
+ var currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+ if (currentThreadSessionInfo == null) {
+ currentThreadSessionInfo = new CarbonSessionInfo()
+ }
+ else {
+ currentThreadSessionInfo = currentThreadSessionInfo.clone()
+ }
+ val threadParams = currentThreadSessionInfo.getThreadParams
+ CarbonSetCommand.validateAndSetValue(threadParams, key, value)
+ ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfo)
+ }
+
+ private[spark] def updateSessionInfoToCurrentThread(sparkSession: SparkSession): Unit = {
+ val carbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.clone()
+ val currentThreadSessionInfoOrig = ThreadLocalSessionInfo.getCarbonSessionInfo
+ if (currentThreadSessionInfoOrig != null) {
+ val currentThreadSessionInfo = currentThreadSessionInfoOrig.clone()
+ // copy all the thread parameters to apply to session parameters
+ currentThreadSessionInfo.getThreadParams.getAll.asScala
+ .foreach(entry => carbonSessionInfo.getSessionParams.addProperty(entry._1, entry._2))
+ carbonSessionInfo.setThreadParams(currentThreadSessionInfo.getThreadParams)
+ }
+ // preserve thread parameters across call
+ ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 56560fd..f2c8a0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.command._
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
@@ -59,20 +59,7 @@ case class CarbonSetCommand(command: SetCommand)
val sessionParms = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams
command.kv match {
case Some((key, Some(value))) =>
- val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key)
- if (isCarbonProperty) {
- sessionParms.addProperty(key, value)
- }
- else if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
- if (key.split("\\.").length == 5) {
- sessionParms.addProperty(key.toLowerCase(), value)
- }
- else {
- throw new MalformedCarbonCommandException(
- "property should be in \" carbon.input.segments.<database_name>" +
- ".<table_name>=<seg_id list> \" format.")
- }
- }
+ CarbonSetCommand.validateAndSetValue(sessionParms, key, value)
case _ =>
}
@@ -80,6 +67,26 @@ case class CarbonSetCommand(command: SetCommand)
}
}
+object CarbonSetCommand {
+ def validateAndSetValue(sessionParams: SessionParams, key: String, value: String): Unit = {
+
+ val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key)
+ if (isCarbonProperty) {
+ sessionParams.addProperty(key, value)
+ }
+ else if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
+ if (key.split("\\.").length == 5) {
+ sessionParams.addProperty(key.toLowerCase(), value)
+ }
+ else {
+ throw new MalformedCarbonCommandException(
+ "property should be in \" carbon.input.segments.<database_name>" +
+ ".<table_name>=<seg_id list> \" format.")
+ }
+ }
+ }
+}
+
case class CarbonResetCommand()
extends RunnableCommand {
override val output = ResetCommand.output
http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 0a918df..a53e71f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.parser
import scala.collection.mutable
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonSession, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
import org.apache.spark.sql.catalyst.parser.ParserUtils._
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.{BucketFields, CarbonCreateTableCo
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
import org.apache.spark.sql.types.StructField
-import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
@@ -45,8 +44,7 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab
private val substitutor = new VariableSubstitution(conf)
override def parsePlan(sqlText: String): LogicalPlan = {
- val carbonSessionInfo: CarbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo
- ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+ CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
try {
super.parsePlan(sqlText)
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
new file mode 100644
index 0000000..e24f8b8
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
@@ -0,0 +1,78 @@
+package org.apache.carbondata.spark.testsuite.segmentreading
+
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+import org.apache.spark.sql.{CarbonSession, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+
+/**
+ * Testcase for set segment in multhread env
+ */
+class TestSegmentReadingForMultiThreading extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll: Unit = {
+ sql("DROP TABLE IF EXISTS carbon_table_MulTI_THread")
+ sql(
+ "CREATE TABLE carbon_table_MulTI_THread (empno int, empname String, designation String, doj " +
+ "Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname " +
+ "String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance " +
+ "int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+ sql(
+ s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE carbon_table_MulTI_THread " +
+ "OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+ sql(
+ s"LOAD DATA LOCAL INPATH '$resourcesPath/data1.csv' INTO TABLE carbon_table_MulTI_THread " +
+ "OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+ sql(
+ s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE carbon_table_MulTI_THread " +
+ "OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+ sql(
+ s"LOAD DATA LOCAL INPATH '$resourcesPath/data1.csv' INTO TABLE carbon_table_MulTI_THread " +
+ "OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+ }
+
+ test("test multithreading for segment reading") {
+
+
+ CarbonSession.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1,2,3")
+ val df = sql("select count(empno) from carbon_table_MulTI_THread")
+ checkAnswer(df, Seq(Row(30)))
+
+ val four = Future {
+ CarbonSession.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1,3")
+ val df = sql("select count(empno) from carbon_table_MulTI_THread")
+ checkAnswer(df, Seq(Row(20)))
+ }
+
+ val three = Future {
+ CarbonSession.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "0,1,2")
+ val df = sql("select count(empno) from carbon_table_MulTI_THread")
+ checkAnswer(df, Seq(Row(30)))
+ }
+
+
+ val one = Future {
+ CarbonSession.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "0,2")
+ val df = sql("select count(empno) from carbon_table_MulTI_THread")
+ checkAnswer(df, Seq(Row(20)))
+ }
+
+ val two = Future {
+ CarbonSession.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1")
+ val df = sql("select count(empno) from carbon_table_MulTI_THread")
+ checkAnswer(df, Seq(Row(10)))
+ }
+ Await.result(Future.sequence(Seq(one, two, three, four)), Duration(300, TimeUnit.SECONDS))
+ }
+
+ override def afterAll: Unit = {
+ sql("DROP TABLE IF EXISTS carbon_table_MulTI_THread")
+
+ }
+}