You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/11/11 09:38:47 UTC

carbondata git commit: [CARBONDATA-1701][SEGMENT READING] Threadsafe api revealed for set segment to read

Repository: carbondata
Updated Branches:
  refs/heads/master 520e50f32 -> 80195da41


[CARBONDATA-1701][SEGMENT READING] Threadsafe api revealed for set segment to read

Example: CarbonSession.threadSet(carbon.input.segments.default.carbon_table_MulTI_THread, 1,2,3)

This closes #1482


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/80195da4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/80195da4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/80195da4

Branch: refs/heads/master
Commit: 80195da41390cd122e6099483149aa4cf59300fd
Parents: 520e50f
Author: rahulforallp <ra...@knoldus.in>
Authored: Fri Nov 10 18:25:09 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Sat Nov 11 15:06:01 2017 +0530

----------------------------------------------------------------------
 .../carbondata/core/util/CarbonSessionInfo.java | 38 +++++++++-
 .../carbondata/core/util/SessionParams.java     | 22 ++++++
 .../sql/CarbonDatasourceHadoopRelation.scala    |  5 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  7 +-
 .../org/apache/spark/sql/CarbonSession.scala    | 31 +++++++-
 .../execution/command/CarbonHiveCommands.scala  | 37 ++++++----
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  6 +-
 .../TestSegmentReadingForMultiThreading.scala   | 78 ++++++++++++++++++++
 8 files changed, 198 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java
index fb778bc..a44bde7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java
@@ -18,15 +18,19 @@
 package org.apache.carbondata.core.util;
 
 import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
 
 /**
  * This class maintains carbon session information details
  */
-public class CarbonSessionInfo implements Serializable {
+public class CarbonSessionInfo implements Serializable, Cloneable {
 
   private static final long serialVersionUID = 7738818814501121256L;
   // contains carbon session param details
   private SessionParams sessionParams;
+  private SessionParams threadParams;
 
   public SessionParams getSessionParams() {
     return sessionParams;
@@ -36,4 +40,36 @@ public class CarbonSessionInfo implements Serializable {
     this.sessionParams = sessionParams;
   }
 
+  public SessionParams getThreadParams() {
+    return threadParams;
+  }
+
+  public void setThreadParams(SessionParams threadParams) {
+    this.threadParams = threadParams;
+  }
+
+  public CarbonSessionInfo() {
+    this.sessionParams = new SessionParams();
+    this.threadParams = new SessionParams();
+  }
+
+  public CarbonSessionInfo clone() throws CloneNotSupportedException {
+    super.clone();
+    CarbonSessionInfo newObj = new CarbonSessionInfo();
+    for (Map.Entry<String, String> entry : sessionParams.getAll().entrySet()) {
+      try {
+        newObj.getSessionParams().addProperty(entry.getKey(), entry.getValue(), false);
+      } catch (InvalidConfigurationException ex) {
+        ex.printStackTrace();
+      }
+    }
+    for (Map.Entry<String, String> entry : threadParams.getAll().entrySet()) {
+      try {
+        newObj.getThreadParams().addProperty(entry.getKey(), entry.getValue(), false);
+      } catch (InvalidConfigurationException ex) {
+        ex.printStackTrace();
+      }
+    }
+    return newObj;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 1878416..1a91272 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -81,6 +81,28 @@ public class SessionParams implements Serializable {
     return this;
   }
 
+  /**
+   * This method will be used to add a new property
+   *
+   * @param key
+   * @return properties value
+   */
+  public SessionParams addProperty(String key, String value, Boolean doAuditing)
+      throws InvalidConfigurationException {
+    boolean isValidConf = validateKeyValue(key, value);
+    if (isValidConf) {
+      if (doAuditing) {
+        LOGGER.audit("The key " + key + " with value " + value + " added in the session param");
+      }
+      sProps.put(key, value);
+    }
+    return this;
+  }
+
+  public Map<String, String> getAll() {
+    return sProps;
+  }
+
   public SessionParams addProps(Map<String, String> addedProps) {
     this.addedProps.putAll(addedProps);
     return this;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 10336eb..3fb65be 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
-import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
 import org.apache.carbondata.hadoop.CarbonProjection
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.apache.carbondata.streaming.StreamSinkFactory
@@ -51,9 +50,7 @@ case class CarbonDatasourceHadoopRelation(
   lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
   lazy val databaseName: String = carbonTable.getDatabaseName
   lazy val tableName: String = carbonTable.getFactTableName
-  lazy val carbonSessionInfo : CarbonSessionInfo =
-    CarbonEnv.getInstance(sparkSession).carbonSessionInfo
-  ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+  CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
 
   @transient lazy val carbonRelation: CarbonRelation =
     CarbonEnv.getInstance(sparkSession).carbonMetastore.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index b324b10..a37b55b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -53,8 +53,13 @@ class CarbonEnv {
   def init(sparkSession: SparkSession): Unit = {
     sparkSession.udf.register("getTupleId", () => "")
     if (!initialized) {
+      // update carbon session parameters , preserve thread parameters
+      val currentThreadSesssionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
       carbonSessionInfo = new CarbonSessionInfo()
-      sessionParams = new SessionParams()
+      sessionParams = carbonSessionInfo.getSessionParams
+      if (currentThreadSesssionInfo != null) {
+        carbonSessionInfo.setThreadParams(currentThreadSesssionInfo.getThreadParams)
+      }
       carbonSessionInfo.setSessionParams(sessionParams)
       ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
       val config = new CarbonSQLConf(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 6020eee..33e34bb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -18,17 +18,20 @@ package org.apache.spark.sql
 
 import java.io.File
 
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession.Builder
 import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener
 import org.apache.spark.sql.hive.CarbonSessionState
+import org.apache.spark.sql.hive.execution.command.CarbonSetCommand
 import org.apache.spark.sql.internal.{SessionState, SharedState}
 import org.apache.spark.util.Utils
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**
@@ -189,4 +192,30 @@ object CarbonSession {
     }
   }
 
+  def threadSet(key: String, value: String): Unit = {
+    var currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+    if (currentThreadSessionInfo == null) {
+      currentThreadSessionInfo = new CarbonSessionInfo()
+    }
+    else {
+      currentThreadSessionInfo = currentThreadSessionInfo.clone()
+    }
+    val threadParams = currentThreadSessionInfo.getThreadParams
+    CarbonSetCommand.validateAndSetValue(threadParams, key, value)
+    ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfo)
+  }
+
+  private[spark] def updateSessionInfoToCurrentThread(sparkSession: SparkSession): Unit = {
+    val carbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.clone()
+    val currentThreadSessionInfoOrig = ThreadLocalSessionInfo.getCarbonSessionInfo
+    if (currentThreadSessionInfoOrig != null) {
+      val currentThreadSessionInfo = currentThreadSessionInfoOrig.clone()
+      // copy all the thread parameters to apply to session parameters
+      currentThreadSessionInfo.getThreadParams.getAll.asScala
+        .foreach(entry => carbonSessionInfo.getSessionParams.addProperty(entry._1, entry._2))
+      carbonSessionInfo.setThreadParams(currentThreadSessionInfo.getThreadParams)
+    }
+    // preserve thread parameters across call
+    ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 56560fd..f2c8a0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.command._
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
@@ -59,20 +59,7 @@ case class CarbonSetCommand(command: SetCommand)
     val sessionParms = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams
     command.kv match {
       case Some((key, Some(value))) =>
-        val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key)
-        if (isCarbonProperty) {
-          sessionParms.addProperty(key, value)
-        }
-        else if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
-          if (key.split("\\.").length == 5) {
-            sessionParms.addProperty(key.toLowerCase(), value)
-          }
-          else {
-            throw new MalformedCarbonCommandException(
-              "property should be in \" carbon.input.segments.<database_name>" +
-              ".<table_name>=<seg_id list> \" format.")
-          }
-        }
+        CarbonSetCommand.validateAndSetValue(sessionParms, key, value)
       case _ =>
 
     }
@@ -80,6 +67,26 @@ case class CarbonSetCommand(command: SetCommand)
   }
 }
 
+object CarbonSetCommand {
+  def validateAndSetValue(sessionParams: SessionParams, key: String, value: String): Unit = {
+
+    val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key)
+    if (isCarbonProperty) {
+      sessionParams.addProperty(key, value)
+    }
+    else if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
+      if (key.split("\\.").length == 5) {
+        sessionParams.addProperty(key.toLowerCase(), value)
+      }
+      else {
+        throw new MalformedCarbonCommandException(
+          "property should be in \" carbon.input.segments.<database_name>" +
+          ".<table_name>=<seg_id list> \" format.")
+      }
+    }
+  }
+}
+
 case class CarbonResetCommand()
   extends RunnableCommand {
   override val output = ResetCommand.output

http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 0a918df..a53e71f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.parser
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonSession, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
 import org.apache.spark.sql.catalyst.parser.ParserUtils._
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.{BucketFields, CarbonCreateTableCo
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 import org.apache.spark.sql.types.StructField
 
-import org.apache.carbondata.core.util.{CarbonSessionInfo, ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
@@ -45,8 +44,7 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab
   private val substitutor = new VariableSubstitution(conf)
 
   override def parsePlan(sqlText: String): LogicalPlan = {
-    val carbonSessionInfo: CarbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo
-    ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+    CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
     try {
       super.parsePlan(sqlText)
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/80195da4/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
new file mode 100644
index 0000000..e24f8b8
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReadingForMultiThreading.scala
@@ -0,0 +1,78 @@
+package org.apache.carbondata.spark.testsuite.segmentreading
+
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+import org.apache.spark.sql.{CarbonSession, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+
+/**
+ * Testcase for set segment in multhread env
+ */
+class TestSegmentReadingForMultiThreading extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll: Unit = {
+    sql("DROP TABLE IF EXISTS carbon_table_MulTI_THread")
+    sql(
+      "CREATE TABLE carbon_table_MulTI_THread (empno int, empname String, designation String, doj " +
+      "Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname " +
+      "String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance " +
+      "int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql(
+      s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE carbon_table_MulTI_THread " +
+      "OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+    sql(
+      s"LOAD DATA LOCAL INPATH '$resourcesPath/data1.csv' INTO TABLE carbon_table_MulTI_THread " +
+      "OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+    sql(
+      s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE carbon_table_MulTI_THread " +
+      "OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+    sql(
+      s"LOAD DATA LOCAL INPATH '$resourcesPath/data1.csv' INTO TABLE carbon_table_MulTI_THread " +
+      "OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')")
+  }
+
+  test("test multithreading for segment reading") {
+
+
+    CarbonSession.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1,2,3")
+    val df = sql("select count(empno) from carbon_table_MulTI_THread")
+    checkAnswer(df, Seq(Row(30)))
+
+    val four = Future {
+      CarbonSession.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1,3")
+      val df = sql("select count(empno) from carbon_table_MulTI_THread")
+      checkAnswer(df, Seq(Row(20)))
+    }
+
+    val three = Future {
+      CarbonSession.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "0,1,2")
+      val df = sql("select count(empno) from carbon_table_MulTI_THread")
+      checkAnswer(df, Seq(Row(30)))
+    }
+
+
+    val one = Future {
+      CarbonSession.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "0,2")
+      val df = sql("select count(empno) from carbon_table_MulTI_THread")
+      checkAnswer(df, Seq(Row(20)))
+    }
+
+    val two = Future {
+      CarbonSession.threadSet("carbon.input.segments.default.carbon_table_MulTI_THread", "1")
+      val df = sql("select count(empno) from carbon_table_MulTI_THread")
+      checkAnswer(df, Seq(Row(10)))
+    }
+    Await.result(Future.sequence(Seq(one, two, three, four)), Duration(300, TimeUnit.SECONDS))
+  }
+
+  override def afterAll: Unit = {
+    sql("DROP TABLE IF EXISTS carbon_table_MulTI_THread")
+
+  }
+}