You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/05/14 01:15:36 UTC
spark git commit: [SQL] Move some classes into packages that are more
appropriate.
Repository: spark
Updated Branches:
refs/heads/master 59250fe51 -> e683182c3
[SQL] Move some classes into packages that are more appropriate.
JavaTypeInference into catalyst
types.DateUtils into catalyst
CacheManager into execution
DefaultParserDialect into catalyst
Author: Reynold Xin <rx...@databricks.com>
Closes #6108 from rxin/sql-rename and squashes the following commits:
3fc9613 [Reynold Xin] Fixed import ordering.
83d9ff4 [Reynold Xin] Fixed codegen tests.
e271e86 [Reynold Xin] mima
f4e24a6 [Reynold Xin] [SQL] Move some classes into packages that are more appropriate.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e683182c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e683182c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e683182c
Branch: refs/heads/master
Commit: e683182c3e6347afdac0e5658487f80e5e054ef4
Parents: 59250fe
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed May 13 16:15:31 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed May 13 16:15:31 2015 -0700
----------------------------------------------------------------------
project/MimaExcludes.scala | 5 +-
.../sql/catalyst/CatalystTypeConverters.scala | 1 +
.../spark/sql/catalyst/JavaTypeInference.scala | 109 ++++++++++++
.../spark/sql/catalyst/ParserDialect.scala | 36 ++++
.../spark/sql/catalyst/expressions/Cast.scala | 1 +
.../expressions/codegen/CodeGenerator.scala | 2 +-
.../sql/catalyst/expressions/literals.scala | 1 +
.../spark/sql/catalyst/util/DateUtils.scala | 90 ++++++++++
.../org/apache/spark/sql/types/DateUtils.scala | 90 ----------
.../org/apache/spark/sql/types/UTF8String.scala | 17 +-
.../expressions/ExpressionEvaluationSuite.scala | 1 +
.../org/apache/spark/sql/CacheManager.scala | 164 ------------------
.../scala/org/apache/spark/sql/Column.scala | 2 +
.../apache/spark/sql/JavaTypeInference.scala | 111 -------------
.../scala/org/apache/spark/sql/SQLContext.scala | 40 +----
.../spark/sql/execution/CacheManager.scala | 165 +++++++++++++++++++
.../apache/spark/sql/execution/pythonUdfs.scala | 5 +-
.../scala/org/apache/spark/sql/functions.scala | 1 +
.../org/apache/spark/sql/jdbc/JDBCRDD.scala | 2 +-
.../apache/spark/sql/json/JacksonParser.scala | 1 +
.../org/apache/spark/sql/json/JsonRDD.scala | 1 +
.../org/apache/spark/sql/SQLQuerySuite.scala | 2 +-
.../org/apache/spark/sql/json/JsonSuite.scala | 1 +
.../spark/sql/parquet/ParquetIOSuite.scala | 1 +
.../apache/spark/sql/hive/HiveInspectors.scala | 1 +
.../org/apache/spark/sql/hive/TableReader.scala | 2 +-
.../sql/hive/execution/SQLQuerySuite.scala | 3 +-
27 files changed, 439 insertions(+), 416 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index f31f0e5..fba7290 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -123,7 +123,10 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.ParquetTestData$"),
ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.TestGroupWriteSupport")
+ "org.apache.spark.sql.parquet.TestGroupWriteSupport"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CachedData"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CachedData$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CacheManager")
) ++ Seq(
// SPARK-7530 Added StreamingContext.getState()
ProblemFilters.exclude[MissingMethodProblem](
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index a13e2f3..75a493b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -23,6 +23,7 @@ import java.util.{Map => JavaMap}
import scala.collection.mutable.HashMap
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
new file mode 100644
index 0000000..625c8d3
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import java.beans.Introspector
+import java.lang.{Iterable => JIterable}
+import java.util.{Iterator => JIterator, Map => JMap}
+
+import scala.language.existentials
+
+import com.google.common.reflect.TypeToken
+import org.apache.spark.sql.types._
+
+/**
+ * Type-inference utilities for POJOs and Java collections.
+ */
+private [sql] object JavaTypeInference {
+
+ private val iterableType = TypeToken.of(classOf[JIterable[_]])
+ private val mapType = TypeToken.of(classOf[JMap[_, _]])
+ private val iteratorReturnType = classOf[JIterable[_]].getMethod("iterator").getGenericReturnType
+ private val nextReturnType = classOf[JIterator[_]].getMethod("next").getGenericReturnType
+ private val keySetReturnType = classOf[JMap[_, _]].getMethod("keySet").getGenericReturnType
+ private val valuesReturnType = classOf[JMap[_, _]].getMethod("values").getGenericReturnType
+
+ /**
+ * Infers the corresponding SQL data type of a Java type.
+ * @param typeToken Java type
+ * @return (SQL data type, nullable)
+ */
+ private [sql] def inferDataType(typeToken: TypeToken[_]): (DataType, Boolean) = {
+ // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
+ typeToken.getRawType match {
+ case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
+ (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)
+
+ case c: Class[_] if c == classOf[java.lang.String] => (StringType, true)
+ case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false)
+ case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false)
+ case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false)
+ case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false)
+ case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false)
+ case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false)
+ case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false)
+
+ case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true)
+ case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true)
+ case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true)
+ case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true)
+ case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true)
+ case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true)
+ case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true)
+
+ case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true)
+ case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
+ case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
+
+ case _ if typeToken.isArray =>
+ val (dataType, nullable) = inferDataType(typeToken.getComponentType)
+ (ArrayType(dataType, nullable), true)
+
+ case _ if iterableType.isAssignableFrom(typeToken) =>
+ val (dataType, nullable) = inferDataType(elementType(typeToken))
+ (ArrayType(dataType, nullable), true)
+
+ case _ if mapType.isAssignableFrom(typeToken) =>
+ val typeToken2 = typeToken.asInstanceOf[TypeToken[_ <: JMap[_, _]]]
+ val mapSupertype = typeToken2.getSupertype(classOf[JMap[_, _]])
+ val keyType = elementType(mapSupertype.resolveType(keySetReturnType))
+ val valueType = elementType(mapSupertype.resolveType(valuesReturnType))
+ val (keyDataType, _) = inferDataType(keyType)
+ val (valueDataType, nullable) = inferDataType(valueType)
+ (MapType(keyDataType, valueDataType, nullable), true)
+
+ case _ =>
+ val beanInfo = Introspector.getBeanInfo(typeToken.getRawType)
+ val properties = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
+ val fields = properties.map { property =>
+ val returnType = typeToken.method(property.getReadMethod).getReturnType
+ val (dataType, nullable) = inferDataType(returnType)
+ new StructField(property.getName, dataType, nullable)
+ }
+ (new StructType(fields), true)
+ }
+ }
+
+ private def elementType(typeToken: TypeToken[_]): TypeToken[_] = {
+ val typeToken2 = typeToken.asInstanceOf[TypeToken[_ <: JIterable[_]]]
+ val iterableSupertype = typeToken2.getSupertype(classOf[JIterable[_]])
+ val iteratorType = iterableSupertype.resolveType(iteratorReturnType)
+ val itemType = iteratorType.resolveType(nextReturnType)
+ itemType
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala
index 05a92b0..554fb4e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala
@@ -31,3 +31,39 @@ abstract class ParserDialect {
// this is the main function that will be implemented by sql parser.
def parse(sqlText: String): LogicalPlan
}
+
+/**
+ * Currently we support the default dialect named "sql", associated with the class
+ * [[DefaultParserDialect]]
+ *
+ * And we can also provide custom SQL Dialect, for example in Spark SQL CLI:
+ * {{{
+ *-- switch to "hiveql" dialect
+ * spark-sql>SET spark.sql.dialect=hiveql;
+ * spark-sql>SELECT * FROM src LIMIT 1;
+ *
+ *-- switch to "sql" dialect
+ * spark-sql>SET spark.sql.dialect=sql;
+ * spark-sql>SELECT * FROM src LIMIT 1;
+ *
+ *-- register the new SQL dialect
+ * spark-sql> SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect;
+ * spark-sql> SELECT * FROM src LIMIT 1;
+ *
+ *-- register the non-exist SQL dialect
+ * spark-sql> SET spark.sql.dialect=NotExistedClass;
+ * spark-sql> SELECT * FROM src LIMIT 1;
+ *
+ *-- Exception will be thrown and switch to dialect
+ *-- "sql" (for SQLContext) or
+ *-- "hiveql" (for HiveContext)
+ * }}}
+ */
+private[spark] class DefaultParserDialect extends ParserDialect {
+ @transient
+ protected val sqlParser = new SqlParser
+
+ override def parse(sqlText: String): LogicalPlan = {
+ sqlParser.parse(sqlText)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index adf941a..d8cf2b2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -21,6 +21,7 @@ import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}
import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
/** Cast the child expression to the target data type. */
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index d17af0e..ecb4c4b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -250,7 +250,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
case Cast(child @ DateType(), StringType) =>
child.castOrNull(c =>
q"""org.apache.spark.sql.types.UTF8String(
- org.apache.spark.sql.types.DateUtils.toString($c))""",
+ org.apache.spark.sql.catalyst.util.DateUtils.toString($c))""",
StringType)
case Cast(child @ NumericType(), IntegerType) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index 18cba4c..5f8c735 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
object Literal {
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
new file mode 100644
index 0000000..3f92be4
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.sql.Date
+import java.text.SimpleDateFormat
+import java.util.{Calendar, TimeZone}
+
+import org.apache.spark.sql.catalyst.expressions.Cast
+
+/**
+ * helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
+ */
+object DateUtils {
+ private val MILLIS_PER_DAY = 86400000
+
+ // Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
+ private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
+ override protected def initialValue: TimeZone = {
+ Calendar.getInstance.getTimeZone
+ }
+ }
+
+ private def javaDateToDays(d: Date): Int = {
+ millisToDays(d.getTime)
+ }
+
+ // we should use the exact day as Int, for example, (year, month, day) -> day
+ def millisToDays(millisLocal: Long): Int = {
+ ((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt
+ }
+
+ private def toMillisSinceEpoch(days: Int): Long = {
+ val millisUtc = days.toLong * MILLIS_PER_DAY
+ millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc)
+ }
+
+ def fromJavaDate(date: java.sql.Date): Int = {
+ javaDateToDays(date)
+ }
+
+ def toJavaDate(daysSinceEpoch: Int): java.sql.Date = {
+ new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch))
+ }
+
+ def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days))
+
+ def stringToTime(s: String): java.util.Date = {
+ if (!s.contains('T')) {
+ // JDBC escape string
+ if (s.contains(' ')) {
+ java.sql.Timestamp.valueOf(s)
+ } else {
+ java.sql.Date.valueOf(s)
+ }
+ } else if (s.endsWith("Z")) {
+ // this is zero timezone of ISO8601
+ stringToTime(s.substring(0, s.length - 1) + "GMT-00:00")
+ } else if (s.indexOf("GMT") == -1) {
+ // timezone with ISO8601
+ val inset = "+00.00".length
+ val s0 = s.substring(0, s.length - inset)
+ val s1 = s.substring(s.length - inset, s.length)
+ if (s0.substring(s0.lastIndexOf(':')).contains('.')) {
+ stringToTime(s0 + "GMT" + s1)
+ } else {
+ stringToTime(s0 + ".0GMT" + s1)
+ }
+ } else {
+ // ISO8601 with GMT insert
+ val ISO8601GMT: SimpleDateFormat = new SimpleDateFormat( "yyyy-MM-dd'T'HH:mm:ss.SSSz" )
+ ISO8601GMT.parse(s)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala
deleted file mode 100644
index d36a491..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.types
-
-import java.sql.Date
-import java.text.SimpleDateFormat
-import java.util.{Calendar, TimeZone}
-
-import org.apache.spark.sql.catalyst.expressions.Cast
-
-/**
- * helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
- */
-object DateUtils {
- private val MILLIS_PER_DAY = 86400000
-
- // Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
- private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
- override protected def initialValue: TimeZone = {
- Calendar.getInstance.getTimeZone
- }
- }
-
- private def javaDateToDays(d: Date): Int = {
- millisToDays(d.getTime)
- }
-
- // we should use the exact day as Int, for example, (year, month, day) -> day
- def millisToDays(millisLocal: Long): Int = {
- ((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt
- }
-
- private def toMillisSinceEpoch(days: Int): Long = {
- val millisUtc = days.toLong * MILLIS_PER_DAY
- millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc)
- }
-
- def fromJavaDate(date: java.sql.Date): Int = {
- javaDateToDays(date)
- }
-
- def toJavaDate(daysSinceEpoch: Int): java.sql.Date = {
- new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch))
- }
-
- def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days))
-
- def stringToTime(s: String): java.util.Date = {
- if (!s.contains('T')) {
- // JDBC escape string
- if (s.contains(' ')) {
- java.sql.Timestamp.valueOf(s)
- } else {
- java.sql.Date.valueOf(s)
- }
- } else if (s.endsWith("Z")) {
- // this is zero timezone of ISO8601
- stringToTime(s.substring(0, s.length - 1) + "GMT-00:00")
- } else if (s.indexOf("GMT") == -1) {
- // timezone with ISO8601
- val inset = "+00.00".length
- val s0 = s.substring(0, s.length - inset)
- val s1 = s.substring(s.length - inset, s.length)
- if (s0.substring(s0.lastIndexOf(':')).contains('.')) {
- stringToTime(s0 + "GMT" + s1)
- } else {
- stringToTime(s0 + ".0GMT" + s1)
- }
- } else {
- // ISO8601 with GMT insert
- val ISO8601GMT: SimpleDateFormat = new SimpleDateFormat( "yyyy-MM-dd'T'HH:mm:ss.SSSz" )
- ISO8601GMT.parse(s)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala
index fc02ba6..bc9c37b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala
@@ -19,15 +19,18 @@ package org.apache.spark.sql.types
import java.util.Arrays
+import org.apache.spark.annotation.DeveloperApi
+
/**
- * A UTF-8 String, as internal representation of StringType in SparkSQL
+ * :: DeveloperApi ::
+ * A UTF-8 String, as internal representation of StringType in SparkSQL
*
- * A String encoded in UTF-8 as an Array[Byte], which can be used for comparison,
- * search, see http://en.wikipedia.org/wiki/UTF-8 for details.
+ * A String encoded in UTF-8 as an Array[Byte], which can be used for comparison,
+ * search, see http://en.wikipedia.org/wiki/UTF-8 for details.
*
- * Note: This is not designed for general use cases, should not be used outside SQL.
+ * Note: This is not designed for general use cases, should not be used outside SQL.
*/
-
+@DeveloperApi
final class UTF8String extends Ordered[UTF8String] with Serializable {
private[this] var bytes: Array[Byte] = _
@@ -180,6 +183,10 @@ final class UTF8String extends Ordered[UTF8String] with Serializable {
}
}
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
object UTF8String {
// number of tailing bytes in a UTF8 sequence for a code point
// see http://en.wikipedia.org/wiki/UTF-8, 192-256 of Byte 1
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
index 04fd261..5c4a152 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.mathfuncs._
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
deleted file mode 100644
index 18584c2..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.concurrent.locks.ReentrantReadWriteLock
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.columnar.InMemoryRelation
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
-
-/** Holds a cached logical plan and its data */
-private case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
-
-/**
- * Provides support in a SQLContext for caching query results and automatically using these cached
- * results when subsequent queries are executed. Data is cached using byte buffers stored in an
- * InMemoryRelation. This relation is automatically substituted query plans that return the
- * `sameResult` as the originally cached query.
- *
- * Internal to Spark SQL.
- */
-private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
-
- @transient
- private val cachedData = new scala.collection.mutable.ArrayBuffer[CachedData]
-
- @transient
- private val cacheLock = new ReentrantReadWriteLock
-
- /** Returns true if the table is currently cached in-memory. */
- def isCached(tableName: String): Boolean = lookupCachedData(sqlContext.table(tableName)).nonEmpty
-
- /** Caches the specified table in-memory. */
- def cacheTable(tableName: String): Unit = cacheQuery(sqlContext.table(tableName), Some(tableName))
-
- /** Removes the specified table from the in-memory cache. */
- def uncacheTable(tableName: String): Unit = uncacheQuery(sqlContext.table(tableName))
-
- /** Acquires a read lock on the cache for the duration of `f`. */
- private def readLock[A](f: => A): A = {
- val lock = cacheLock.readLock()
- lock.lock()
- try f finally {
- lock.unlock()
- }
- }
-
- /** Acquires a write lock on the cache for the duration of `f`. */
- private def writeLock[A](f: => A): A = {
- val lock = cacheLock.writeLock()
- lock.lock()
- try f finally {
- lock.unlock()
- }
- }
-
- /** Clears all cached tables. */
- private[sql] def clearCache(): Unit = writeLock {
- cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist())
- cachedData.clear()
- }
-
- /** Checks if the cache is empty. */
- private[sql] def isEmpty: Boolean = readLock {
- cachedData.isEmpty
- }
-
- /**
- * Caches the data produced by the logical representation of the given schema rdd. Unlike
- * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
- * the in-memory columnar representation of the underlying table is expensive.
- */
- private[sql] def cacheQuery(
- query: DataFrame,
- tableName: Option[String] = None,
- storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
- val planToCache = query.queryExecution.analyzed
- if (lookupCachedData(planToCache).nonEmpty) {
- logWarning("Asked to cache already cached data.")
- } else {
- cachedData +=
- CachedData(
- planToCache,
- InMemoryRelation(
- sqlContext.conf.useCompression,
- sqlContext.conf.columnBatchSize,
- storageLevel,
- query.queryExecution.executedPlan,
- tableName))
- }
- }
-
- /** Removes the data for the given [[DataFrame]] from the cache */
- private[sql] def uncacheQuery(query: DataFrame, blocking: Boolean = true): Unit = writeLock {
- val planToCache = query.queryExecution.analyzed
- val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
- require(dataIndex >= 0, s"Table $query is not cached.")
- cachedData(dataIndex).cachedRepresentation.uncache(blocking)
- cachedData.remove(dataIndex)
- }
-
- /** Tries to remove the data for the given [[DataFrame]] from the cache if it's cached */
- private[sql] def tryUncacheQuery(
- query: DataFrame,
- blocking: Boolean = true): Boolean = writeLock {
- val planToCache = query.queryExecution.analyzed
- val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
- val found = dataIndex >= 0
- if (found) {
- cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
- cachedData.remove(dataIndex)
- }
- found
- }
-
- /** Optionally returns cached data for the given [[DataFrame]] */
- private[sql] def lookupCachedData(query: DataFrame): Option[CachedData] = readLock {
- lookupCachedData(query.queryExecution.analyzed)
- }
-
- /** Optionally returns cached data for the given LogicalPlan. */
- private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
- cachedData.find(cd => plan.sameResult(cd.plan))
- }
-
- /** Replaces segments of the given logical plan with cached versions where possible. */
- private[sql] def useCachedData(plan: LogicalPlan): LogicalPlan = {
- plan transformDown {
- case currentFragment =>
- lookupCachedData(currentFragment)
- .map(_.cachedRepresentation.withOutput(currentFragment.output))
- .getOrElse(currentFragment)
- }
- }
-
- /**
- * Invalidates the cache of any data that contains `plan`. Note that it is possible that this
- * function will over invalidate.
- */
- private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock {
- cachedData.foreach {
- case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty =>
- data.cachedRepresentation.recache()
- case _ =>
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 42f5bcd..8bf1320 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -346,6 +346,7 @@ class Column(protected[sql] val expr: Expression) extends Logging {
* }}}
*
* @group expr_ops
+ * @since 1.4.0
*/
def when(condition: Column, value: Any):Column = this.expr match {
case CaseWhen(branches: Seq[Expression]) =>
@@ -374,6 +375,7 @@ class Column(protected[sql] val expr: Expression) extends Logging {
* }}}
*
* @group expr_ops
+ * @since 1.4.0
*/
def otherwise(value: Any):Column = this.expr match {
case CaseWhen(branches: Seq[Expression]) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala b/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala
deleted file mode 100644
index 1ec874f..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.beans.Introspector
-import java.lang.{Iterable => JIterable}
-import java.util.{Iterator => JIterator, Map => JMap}
-
-import scala.language.existentials
-
-import com.google.common.reflect.TypeToken
-
-import org.apache.spark.sql.types._
-
-
-/**
- * Type-inference utilities for POJOs and Java collections.
- */
-private [sql] object JavaTypeInference {
-
- private val iterableType = TypeToken.of(classOf[JIterable[_]])
- private val mapType = TypeToken.of(classOf[JMap[_, _]])
- private val iteratorReturnType = classOf[JIterable[_]].getMethod("iterator").getGenericReturnType
- private val nextReturnType = classOf[JIterator[_]].getMethod("next").getGenericReturnType
- private val keySetReturnType = classOf[JMap[_, _]].getMethod("keySet").getGenericReturnType
- private val valuesReturnType = classOf[JMap[_, _]].getMethod("values").getGenericReturnType
-
- /**
- * Infers the corresponding SQL data type of a Java type.
- * @param typeToken Java type
- * @return (SQL data type, nullable)
- */
- private [sql] def inferDataType(typeToken: TypeToken[_]): (DataType, Boolean) = {
- // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
- typeToken.getRawType match {
- case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
- (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)
-
- case c: Class[_] if c == classOf[java.lang.String] => (StringType, true)
- case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false)
- case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false)
- case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false)
- case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false)
- case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false)
- case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false)
- case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false)
-
- case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true)
- case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true)
- case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true)
- case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true)
- case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true)
- case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true)
- case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true)
-
- case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true)
- case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
- case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
-
- case _ if typeToken.isArray =>
- val (dataType, nullable) = inferDataType(typeToken.getComponentType)
- (ArrayType(dataType, nullable), true)
-
- case _ if iterableType.isAssignableFrom(typeToken) =>
- val (dataType, nullable) = inferDataType(elementType(typeToken))
- (ArrayType(dataType, nullable), true)
-
- case _ if mapType.isAssignableFrom(typeToken) =>
- val typeToken2 = typeToken.asInstanceOf[TypeToken[_ <: JMap[_, _]]]
- val mapSupertype = typeToken2.getSupertype(classOf[JMap[_, _]])
- val keyType = elementType(mapSupertype.resolveType(keySetReturnType))
- val valueType = elementType(mapSupertype.resolveType(valuesReturnType))
- val (keyDataType, _) = inferDataType(keyType)
- val (valueDataType, nullable) = inferDataType(valueType)
- (MapType(keyDataType, valueDataType, nullable), true)
-
- case _ =>
- val beanInfo = Introspector.getBeanInfo(typeToken.getRawType)
- val properties = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
- val fields = properties.map { property =>
- val returnType = typeToken.method(property.getReadMethod).getReturnType
- val (dataType, nullable) = inferDataType(returnType)
- new StructField(property.getName, dataType, nullable)
- }
- (new StructType(fields), true)
- }
- }
-
- private def elementType(typeToken: TypeToken[_]): TypeToken[_] = {
- val typeToken2 = typeToken.asInstanceOf[TypeToken[_ <: JIterable[_]]]
- val iterableSupertype = typeToken2.getSupertype(classOf[JIterable[_]])
- val iteratorType = iterableSupertype.resolveType(iteratorReturnType)
- val itemType = iteratorType.resolveType(nextReturnType)
- itemType
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 0a148c7..521f3dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -33,6 +33,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.errors.DialectException
@@ -40,7 +41,6 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.ParserDialect
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, expressions}
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json._
@@ -51,42 +51,6 @@ import org.apache.spark.util.Utils
import org.apache.spark.{Partition, SparkContext}
/**
- * Currently we support the default dialect named "sql", associated with the class
- * [[DefaultParserDialect]]
- *
- * And we can also provide custom SQL Dialect, for example in Spark SQL CLI:
- * {{{
- *-- switch to "hiveql" dialect
- * spark-sql>SET spark.sql.dialect=hiveql;
- * spark-sql>SELECT * FROM src LIMIT 1;
- *
- *-- switch to "sql" dialect
- * spark-sql>SET spark.sql.dialect=sql;
- * spark-sql>SELECT * FROM src LIMIT 1;
- *
- *-- register the new SQL dialect
- * spark-sql> SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect;
- * spark-sql> SELECT * FROM src LIMIT 1;
- *
- *-- register the non-exist SQL dialect
- * spark-sql> SET spark.sql.dialect=NotExistedClass;
- * spark-sql> SELECT * FROM src LIMIT 1;
- *
- *-- Exception will be thrown and switch to dialect
- *-- "sql" (for SQLContext) or
- *-- "hiveql" (for HiveContext)
- * }}}
- */
-private[spark] class DefaultParserDialect extends ParserDialect {
- @transient
- protected val sqlParser = new catalyst.SqlParser
-
- override def parse(sqlText: String): LogicalPlan = {
- sqlParser.parse(sqlText)
- }
-}
-
-/**
* The entry point for working with structured data (rows and columns) in Spark. Allows the
* creation of [[DataFrame]] objects as well as the execution of SQL queries.
*
@@ -1276,7 +1240,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
val projectSet = AttributeSet(projectList.flatMap(_.references))
val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
val filterCondition =
- prunePushedDownFilters(filterPredicates).reduceLeftOption(expressions.And)
+ prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And)
// Right now we still use a projection even if the only evaluation is applying an alias
// to a column. Since this is a no-op, it could be avoided. However, using this
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
new file mode 100644
index 0000000..5fcc48a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.columnar.InMemoryRelation
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
+
+/** Holds a cached logical plan and its data */
+private[sql] case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
+
+/**
+ * Provides support in a SQLContext for caching query results and automatically using these cached
+ * results when subsequent queries are executed. Data is cached using byte buffers stored in an
+ * InMemoryRelation. This relation is automatically substituted query plans that return the
+ * `sameResult` as the originally cached query.
+ *
+ * Internal to Spark SQL.
+ */
+private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
+
+ @transient
+ private val cachedData = new scala.collection.mutable.ArrayBuffer[CachedData]
+
+ @transient
+ private val cacheLock = new ReentrantReadWriteLock
+
+ /** Returns true if the table is currently cached in-memory. */
+ def isCached(tableName: String): Boolean = lookupCachedData(sqlContext.table(tableName)).nonEmpty
+
+ /** Caches the specified table in-memory. */
+ def cacheTable(tableName: String): Unit = cacheQuery(sqlContext.table(tableName), Some(tableName))
+
+ /** Removes the specified table from the in-memory cache. */
+ def uncacheTable(tableName: String): Unit = uncacheQuery(sqlContext.table(tableName))
+
+ /** Acquires a read lock on the cache for the duration of `f`. */
+ private def readLock[A](f: => A): A = {
+ val lock = cacheLock.readLock()
+ lock.lock()
+ try f finally {
+ lock.unlock()
+ }
+ }
+
+ /** Acquires a write lock on the cache for the duration of `f`. */
+ private def writeLock[A](f: => A): A = {
+ val lock = cacheLock.writeLock()
+ lock.lock()
+ try f finally {
+ lock.unlock()
+ }
+ }
+
+ /** Clears all cached tables. */
+ private[sql] def clearCache(): Unit = writeLock {
+ cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist())
+ cachedData.clear()
+ }
+
+ /** Checks if the cache is empty. */
+ private[sql] def isEmpty: Boolean = readLock {
+ cachedData.isEmpty
+ }
+
+ /**
+ * Caches the data produced by the logical representation of the given schema rdd. Unlike
+ * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
+ * the in-memory columnar representation of the underlying table is expensive.
+ */
+ private[sql] def cacheQuery(
+ query: DataFrame,
+ tableName: Option[String] = None,
+ storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
+ val planToCache = query.queryExecution.analyzed
+ if (lookupCachedData(planToCache).nonEmpty) {
+ logWarning("Asked to cache already cached data.")
+ } else {
+ cachedData +=
+ CachedData(
+ planToCache,
+ InMemoryRelation(
+ sqlContext.conf.useCompression,
+ sqlContext.conf.columnBatchSize,
+ storageLevel,
+ query.queryExecution.executedPlan,
+ tableName))
+ }
+ }
+
+ /** Removes the data for the given [[DataFrame]] from the cache */
+ private[sql] def uncacheQuery(query: DataFrame, blocking: Boolean = true): Unit = writeLock {
+ val planToCache = query.queryExecution.analyzed
+ val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
+ require(dataIndex >= 0, s"Table $query is not cached.")
+ cachedData(dataIndex).cachedRepresentation.uncache(blocking)
+ cachedData.remove(dataIndex)
+ }
+
+ /** Tries to remove the data for the given [[DataFrame]] from the cache if it's cached */
+ private[sql] def tryUncacheQuery(
+ query: DataFrame,
+ blocking: Boolean = true): Boolean = writeLock {
+ val planToCache = query.queryExecution.analyzed
+ val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
+ val found = dataIndex >= 0
+ if (found) {
+ cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+ cachedData.remove(dataIndex)
+ }
+ found
+ }
+
+ /** Optionally returns cached data for the given [[DataFrame]] */
+ private[sql] def lookupCachedData(query: DataFrame): Option[CachedData] = readLock {
+ lookupCachedData(query.queryExecution.analyzed)
+ }
+
+ /** Optionally returns cached data for the given LogicalPlan. */
+ private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
+ cachedData.find(cd => plan.sameResult(cd.plan))
+ }
+
+ /** Replaces segments of the given logical plan with cached versions where possible. */
+ private[sql] def useCachedData(plan: LogicalPlan): LogicalPlan = {
+ plan transformDown {
+ case currentFragment =>
+ lookupCachedData(currentFragment)
+ .map(_.cachedRepresentation.withOutput(currentFragment.output))
+ .getOrElse(currentFragment)
+ }
+ }
+
+ /**
+ * Invalidates the cache of any data that contains `plan`. Note that it is possible that this
+ * function will over invalidate.
+ */
+ private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock {
+ cachedData.foreach {
+ case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty =>
+ data.cachedRepresentation.recache()
+ case _ =>
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 3dbc383..65dd7ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -19,20 +19,21 @@ package org.apache.spark.sql.execution
import java.util.{List => JList, Map => JMap}
-import org.apache.spark.rdd.RDD
-
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import net.razorvine.pickle.{Pickler, Unpickler}
+
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.{PythonBroadcast, PythonRDD}
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
import org.apache.spark.{Accumulator, Logging => SparkLogging}
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 099e1d8..4404ad8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -438,6 +438,7 @@ object functions {
* }}}
*
* @group normal_funcs
+ * @since 1.4.0
*/
def when(condition: Column, value: Any): Column = {
CaseWhen(Seq(condition.expr, lit(value).expr))
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index a03ade3..40483d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -25,9 +25,9 @@ import org.apache.commons.lang3.StringUtils
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources._
-import org.apache.spark.util.Utils
private[sql] object JDBCRDD extends Logging {
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
index a8e69ae..8161151 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.core._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index f62973d..4c32710 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -29,6 +29,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
import org.apache.spark.Logging
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index ec0e76c..8cdbe07 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -19,10 +19,10 @@ package org.apache.spark.sql
import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.sql.catalyst.DefaultParserDialect
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.execution.GeneratedAggregate
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext.{udf => _, _}
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 263fafb..b06e338 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.JsonFactory
import org.scalactic.Tolerance._
import org.apache.spark.sql.TestData._
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.functions._
import org.apache.spark.sql.json.InferSchema.compatibleType
import org.apache.spark.sql.sources.LogicalRelation
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 7c371db..008443d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -35,6 +35,7 @@ import parquet.schema.{MessageType, MessageTypeParser}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.implicits._
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 74ae984..7c7666f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.serde2.{io => hiveIo}
import org.apache.hadoop.{io => hadoopIo}
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types
import org.apache.spark.sql.types._
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index b69312f..0b6f7a3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -35,7 +35,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.Logging
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.DateUtils
+import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.util.Utils
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 1d6393a..eaa9d6a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -17,8 +17,10 @@
package org.apache.spark.sql.hive.execution
+import org.apache.spark.sql.catalyst.DefaultParserDialect
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.errors.DialectException
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -26,7 +28,6 @@ import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation}
import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, DefaultParserDialect, QueryTest, Row, SQLConf}
case class Nested1(f1: Nested2)
case class Nested2(f2: Nested3)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org