You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/03/08 05:12:43 UTC
carbondata git commit: [CARBONDATA-2237] Removing parsers thread
local objects after parsing of carbon query
Repository: carbondata
Updated Branches:
refs/heads/master 910f26171 -> 3fb406618
[CARBONDATA-2237] Removing parsers thread local objects after parsing of carbon query
This closes #2040
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3fb40661
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3fb40661
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3fb40661
Branch: refs/heads/master
Commit: 3fb406618b3cd68be680ae217f33478d87b74eb8
Parents: 910f261
Author: ravipesala <ra...@gmail.com>
Authored: Wed Mar 7 16:58:30 2018 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Mar 8 10:42:12 2018 +0530
----------------------------------------------------------------------
.../carbondata/spark/util/CarbonScalaUtil.scala | 42 ++++++++++++++++++++
.../sql/parser/CarbonSpark2SqlParser.scala | 21 +++++-----
2 files changed, 54 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fb40661/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 33263d6..773ea16 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -18,6 +18,7 @@
package org.apache.carbondata.spark.util
import java.{lang, util}
+import java.lang.ref.Reference
import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util.Date
@@ -585,4 +586,45 @@ object CarbonScalaUtil {
String.valueOf(Math.pow(10, 5).toInt + taskId) +
String.valueOf(partitionNumber + Math.pow(10, 5).toInt)
}
+
+ /**
+ * Use reflection to clean the parser objects which are set in thread local to avoid memory issue
+ */
+ def cleanParserThreadLocals(): Unit = {
+ try {
+ // Get a reference to the thread locals table of the current thread
+ val thread = Thread.currentThread
+ val threadLocalsField = classOf[Thread].getDeclaredField("inheritableThreadLocals")
+ threadLocalsField.setAccessible(true)
+ val threadLocalTable = threadLocalsField.get(thread)
+ // Get a reference to the array holding the thread local variables inside the
+ // ThreadLocalMap of the current thread
+ val threadLocalMapClass = Class.forName("java.lang.ThreadLocal$ThreadLocalMap")
+ val tableField = threadLocalMapClass.getDeclaredField("table")
+ tableField.setAccessible(true)
+ val table = tableField.get(threadLocalTable)
+ // The key to the ThreadLocalMap is a WeakReference object. The referent field of this object
+ // is a reference to the actual ThreadLocal variable
+ val referentField = classOf[Reference[Thread]].getDeclaredField("referent")
+ referentField.setAccessible(true)
+ var i = 0
+ while (i < lang.reflect.Array.getLength(table)) {
+ // Each entry in the table array of ThreadLocalMap is an Entry object
+ val entry = lang.reflect.Array.get(table, i)
+ if (entry != null) {
+ // Get a reference to the thread local object and remove it from the table
+ val threadLocal = referentField.get(entry).asInstanceOf[ThreadLocal[_]]
+ if (threadLocal != null &&
+ threadLocal.getClass.getName.startsWith("scala.util.DynamicVariable")) {
+ threadLocal.remove()
+ }
+ }
+ i += 1
+ }
+ table
+ } catch {
+ case e: Exception =>
+ // ignore it
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fb40661/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 86790ba..3896ce9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
/**
* TODO remove the duplicate code and add the common methods to common class.
@@ -52,16 +52,19 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
// Initialize the Keywords.
initLexical
phrase(start)(new lexical.Scanner(input)) match {
- case Success(plan, _) => plan match {
- case x: CarbonLoadDataCommand =>
- x.inputSqlString = input
- x
- case x: CarbonAlterTableCompactionCommand =>
- x.alterTableModel.alterSql = input
- x
- case logicalPlan => logicalPlan
+ case Success(plan, _) =>
+ CarbonScalaUtil.cleanParserThreadLocals()
+ plan match {
+ case x: CarbonLoadDataCommand =>
+ x.inputSqlString = input
+ x
+ case x: CarbonAlterTableCompactionCommand =>
+ x.alterTableModel.alterSql = input
+ x
+ case logicalPlan => logicalPlan
}
case failureOrError =>
+ CarbonScalaUtil.cleanParserThreadLocals()
CarbonException.analysisException(failureOrError.toString)
}
}