You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/05/14 01:15:36 UTC

spark git commit: [SQL] Move some classes into packages that are more appropriate.

Repository: spark
Updated Branches:
  refs/heads/master 59250fe51 -> e683182c3


[SQL] Move some classes into packages that are more appropriate.

JavaTypeInference into catalyst
types.DateUtils into catalyst
CacheManager into execution
DefaultParserDialect into catalyst

Author: Reynold Xin <rx...@databricks.com>

Closes #6108 from rxin/sql-rename and squashes the following commits:

3fc9613 [Reynold Xin] Fixed import ordering.
83d9ff4 [Reynold Xin] Fixed codegen tests.
e271e86 [Reynold Xin] mima
f4e24a6 [Reynold Xin] [SQL] Move some classes into packages that are more appropriate.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e683182c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e683182c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e683182c

Branch: refs/heads/master
Commit: e683182c3e6347afdac0e5658487f80e5e054ef4
Parents: 59250fe
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed May 13 16:15:31 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed May 13 16:15:31 2015 -0700

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |   5 +-
 .../sql/catalyst/CatalystTypeConverters.scala   |   1 +
 .../spark/sql/catalyst/JavaTypeInference.scala  | 109 ++++++++++++
 .../spark/sql/catalyst/ParserDialect.scala      |  36 ++++
 .../spark/sql/catalyst/expressions/Cast.scala   |   1 +
 .../expressions/codegen/CodeGenerator.scala     |   2 +-
 .../sql/catalyst/expressions/literals.scala     |   1 +
 .../spark/sql/catalyst/util/DateUtils.scala     |  90 ++++++++++
 .../org/apache/spark/sql/types/DateUtils.scala  |  90 ----------
 .../org/apache/spark/sql/types/UTF8String.scala |  17 +-
 .../expressions/ExpressionEvaluationSuite.scala |   1 +
 .../org/apache/spark/sql/CacheManager.scala     | 164 ------------------
 .../scala/org/apache/spark/sql/Column.scala     |   2 +
 .../apache/spark/sql/JavaTypeInference.scala    | 111 -------------
 .../scala/org/apache/spark/sql/SQLContext.scala |  40 +----
 .../spark/sql/execution/CacheManager.scala      | 165 +++++++++++++++++++
 .../apache/spark/sql/execution/pythonUdfs.scala |   5 +-
 .../scala/org/apache/spark/sql/functions.scala  |   1 +
 .../org/apache/spark/sql/jdbc/JDBCRDD.scala     |   2 +-
 .../apache/spark/sql/json/JacksonParser.scala   |   1 +
 .../org/apache/spark/sql/json/JsonRDD.scala     |   1 +
 .../org/apache/spark/sql/SQLQuerySuite.scala    |   2 +-
 .../org/apache/spark/sql/json/JsonSuite.scala   |   1 +
 .../spark/sql/parquet/ParquetIOSuite.scala      |   1 +
 .../apache/spark/sql/hive/HiveInspectors.scala  |   1 +
 .../org/apache/spark/sql/hive/TableReader.scala |   2 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |   3 +-
 27 files changed, 439 insertions(+), 416 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index f31f0e5..fba7290 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -123,7 +123,10 @@ object MimaExcludes {
             ProblemFilters.exclude[MissingClassProblem](
               "org.apache.spark.sql.parquet.ParquetTestData$"),
             ProblemFilters.exclude[MissingClassProblem](
-              "org.apache.spark.sql.parquet.TestGroupWriteSupport")
+              "org.apache.spark.sql.parquet.TestGroupWriteSupport"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CachedData"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CachedData$"),
+            ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CacheManager")
           ) ++ Seq(
             // SPARK-7530 Added StreamingContext.getState()
             ProblemFilters.exclude[MissingMethodProblem](

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index a13e2f3..75a493b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -23,6 +23,7 @@ import java.util.{Map => JavaMap}
 import scala.collection.mutable.HashMap
 
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
new file mode 100644
index 0000000..625c8d3
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import java.beans.Introspector
+import java.lang.{Iterable => JIterable}
+import java.util.{Iterator => JIterator, Map => JMap}
+
+import scala.language.existentials
+
+import com.google.common.reflect.TypeToken
+import org.apache.spark.sql.types._
+
+/**
+ * Type-inference utilities for POJOs and Java collections.
+ */
+private [sql] object JavaTypeInference {
+
+  private val iterableType = TypeToken.of(classOf[JIterable[_]])
+  private val mapType = TypeToken.of(classOf[JMap[_, _]])
+  private val iteratorReturnType = classOf[JIterable[_]].getMethod("iterator").getGenericReturnType
+  private val nextReturnType = classOf[JIterator[_]].getMethod("next").getGenericReturnType
+  private val keySetReturnType = classOf[JMap[_, _]].getMethod("keySet").getGenericReturnType
+  private val valuesReturnType = classOf[JMap[_, _]].getMethod("values").getGenericReturnType
+
+  /**
+   * Infers the corresponding SQL data type of a Java type.
+   * @param typeToken Java type
+   * @return (SQL data type, nullable)
+   */
+  private [sql] def inferDataType(typeToken: TypeToken[_]): (DataType, Boolean) = {
+    // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
+    typeToken.getRawType match {
+      case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
+        (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)
+
+      case c: Class[_] if c == classOf[java.lang.String] => (StringType, true)
+      case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false)
+      case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false)
+      case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false)
+      case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false)
+      case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false)
+      case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false)
+      case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false)
+
+      case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true)
+      case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true)
+      case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true)
+      case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true)
+      case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true)
+      case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true)
+      case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true)
+
+      case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true)
+      case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
+      case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
+
+      case _ if typeToken.isArray =>
+        val (dataType, nullable) = inferDataType(typeToken.getComponentType)
+        (ArrayType(dataType, nullable), true)
+
+      case _ if iterableType.isAssignableFrom(typeToken) =>
+        val (dataType, nullable) = inferDataType(elementType(typeToken))
+        (ArrayType(dataType, nullable), true)
+
+      case _ if mapType.isAssignableFrom(typeToken) =>
+        val typeToken2 = typeToken.asInstanceOf[TypeToken[_ <: JMap[_, _]]]
+        val mapSupertype = typeToken2.getSupertype(classOf[JMap[_, _]])
+        val keyType = elementType(mapSupertype.resolveType(keySetReturnType))
+        val valueType = elementType(mapSupertype.resolveType(valuesReturnType))
+        val (keyDataType, _) = inferDataType(keyType)
+        val (valueDataType, nullable) = inferDataType(valueType)
+        (MapType(keyDataType, valueDataType, nullable), true)
+
+      case _ =>
+        val beanInfo = Introspector.getBeanInfo(typeToken.getRawType)
+        val properties = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
+        val fields = properties.map { property =>
+          val returnType = typeToken.method(property.getReadMethod).getReturnType
+          val (dataType, nullable) = inferDataType(returnType)
+          new StructField(property.getName, dataType, nullable)
+        }
+        (new StructType(fields), true)
+    }
+  }
+
+  private def elementType(typeToken: TypeToken[_]): TypeToken[_] = {
+    val typeToken2 = typeToken.asInstanceOf[TypeToken[_ <: JIterable[_]]]
+    val iterableSupertype = typeToken2.getSupertype(classOf[JIterable[_]])
+    val iteratorType = iterableSupertype.resolveType(iteratorReturnType)
+    val itemType = iteratorType.resolveType(nextReturnType)
+    itemType
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala
index 05a92b0..554fb4e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala
@@ -31,3 +31,39 @@ abstract class ParserDialect {
   // this is the main function that will be implemented by sql parser.
   def parse(sqlText: String): LogicalPlan
 }
+
+/**
+ * Currently we support the default dialect named "sql", associated with the class
+ * [[DefaultParserDialect]]
+ *
+ * And we can also provide custom SQL Dialect, for example in Spark SQL CLI:
+ * {{{
+ *-- switch to "hiveql" dialect
+ *   spark-sql>SET spark.sql.dialect=hiveql;
+ *   spark-sql>SELECT * FROM src LIMIT 1;
+ *
+ *-- switch to "sql" dialect
+ *   spark-sql>SET spark.sql.dialect=sql;
+ *   spark-sql>SELECT * FROM src LIMIT 1;
+ *
+ *-- register the new SQL dialect
+ *   spark-sql> SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect;
+ *   spark-sql> SELECT * FROM src LIMIT 1;
+ *
+ *-- register the non-exist SQL dialect
+ *   spark-sql> SET spark.sql.dialect=NotExistedClass;
+ *   spark-sql> SELECT * FROM src LIMIT 1;
+ *
+ *-- Exception will be thrown and switch to dialect
+ *-- "sql" (for SQLContext) or
+ *-- "hiveql" (for HiveContext)
+ * }}}
+ */
+private[spark] class DefaultParserDialect extends ParserDialect {
+  @transient
+  protected val sqlParser = new SqlParser
+
+  override def parse(sqlText: String): LogicalPlan = {
+    sqlParser.parse(sqlText)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index adf941a..d8cf2b2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -21,6 +21,7 @@ import java.sql.{Date, Timestamp}
 import java.text.{DateFormat, SimpleDateFormat}
 
 import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
 
 /** Cast the child expression to the target data type. */

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index d17af0e..ecb4c4b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -250,7 +250,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
       case Cast(child @ DateType(), StringType) =>
         child.castOrNull(c =>
           q"""org.apache.spark.sql.types.UTF8String(
-                org.apache.spark.sql.types.DateUtils.toString($c))""",
+                org.apache.spark.sql.catalyst.util.DateUtils.toString($c))""",
           StringType)
 
       case Cast(child @ NumericType(), IntegerType) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index 18cba4c..5f8c735 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
 import java.sql.{Date, Timestamp}
 
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
 
 object Literal {

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
new file mode 100644
index 0000000..3f92be4
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.sql.Date
+import java.text.SimpleDateFormat
+import java.util.{Calendar, TimeZone}
+
+import org.apache.spark.sql.catalyst.expressions.Cast
+
+/**
+ * helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
+ */
+object DateUtils {
+  private val MILLIS_PER_DAY = 86400000
+
+  // Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
+  private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
+    override protected def initialValue: TimeZone = {
+      Calendar.getInstance.getTimeZone
+    }
+  }
+
+  private def javaDateToDays(d: Date): Int = {
+    millisToDays(d.getTime)
+  }
+
+  // we should use the exact day as Int, for example, (year, month, day) -> day
+  def millisToDays(millisLocal: Long): Int = {
+    ((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt
+  }
+
+  private def toMillisSinceEpoch(days: Int): Long = {
+    val millisUtc = days.toLong * MILLIS_PER_DAY
+    millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc)
+  }
+
+  def fromJavaDate(date: java.sql.Date): Int = {
+    javaDateToDays(date)
+  }
+
+  def toJavaDate(daysSinceEpoch: Int): java.sql.Date = {
+    new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch))
+  }
+
+  def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days))
+
+  def stringToTime(s: String): java.util.Date = {
+    if (!s.contains('T')) {
+      // JDBC escape string
+      if (s.contains(' ')) {
+        java.sql.Timestamp.valueOf(s)
+      } else {
+        java.sql.Date.valueOf(s)
+      }
+    } else if (s.endsWith("Z")) {
+      // this is zero timezone of ISO8601
+      stringToTime(s.substring(0, s.length - 1) + "GMT-00:00")
+    } else if (s.indexOf("GMT") == -1) {
+      // timezone with ISO8601
+      val inset = "+00.00".length
+      val s0 = s.substring(0, s.length - inset)
+      val s1 = s.substring(s.length - inset, s.length)
+      if (s0.substring(s0.lastIndexOf(':')).contains('.')) {
+        stringToTime(s0 + "GMT" + s1)
+      } else {
+        stringToTime(s0 + ".0GMT" + s1)
+      }
+    } else {
+      // ISO8601 with GMT insert
+      val ISO8601GMT: SimpleDateFormat = new SimpleDateFormat( "yyyy-MM-dd'T'HH:mm:ss.SSSz" )
+      ISO8601GMT.parse(s)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala
deleted file mode 100644
index d36a491..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.types
-
-import java.sql.Date
-import java.text.SimpleDateFormat
-import java.util.{Calendar, TimeZone}
-
-import org.apache.spark.sql.catalyst.expressions.Cast
-
-/**
- * helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
- */
-object DateUtils {
-  private val MILLIS_PER_DAY = 86400000
-
-  // Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
-  private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
-    override protected def initialValue: TimeZone = {
-      Calendar.getInstance.getTimeZone
-    }
-  }
-
-  private def javaDateToDays(d: Date): Int = {
-    millisToDays(d.getTime)
-  }
-
-  // we should use the exact day as Int, for example, (year, month, day) -> day
-  def millisToDays(millisLocal: Long): Int = {
-    ((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt
-  }
-
-  private def toMillisSinceEpoch(days: Int): Long = {
-    val millisUtc = days.toLong * MILLIS_PER_DAY
-    millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc)
-  }
-
-  def fromJavaDate(date: java.sql.Date): Int = {
-    javaDateToDays(date)
-  }
-
-  def toJavaDate(daysSinceEpoch: Int): java.sql.Date = {
-    new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch))
-  }
-
-  def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days))
-
-  def stringToTime(s: String): java.util.Date = {
-    if (!s.contains('T')) {
-      // JDBC escape string
-      if (s.contains(' ')) {
-        java.sql.Timestamp.valueOf(s)
-      } else {
-        java.sql.Date.valueOf(s)
-      }
-    } else if (s.endsWith("Z")) {
-      // this is zero timezone of ISO8601
-      stringToTime(s.substring(0, s.length - 1) + "GMT-00:00")
-    } else if (s.indexOf("GMT") == -1) {
-      // timezone with ISO8601
-      val inset = "+00.00".length
-      val s0 = s.substring(0, s.length - inset)
-      val s1 = s.substring(s.length - inset, s.length)
-      if (s0.substring(s0.lastIndexOf(':')).contains('.')) {
-        stringToTime(s0 + "GMT" + s1)
-      } else {
-        stringToTime(s0 + ".0GMT" + s1)
-      }
-    } else {
-      // ISO8601 with GMT insert
-      val ISO8601GMT: SimpleDateFormat = new SimpleDateFormat( "yyyy-MM-dd'T'HH:mm:ss.SSSz" )
-      ISO8601GMT.parse(s)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala
index fc02ba6..bc9c37b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala
@@ -19,15 +19,18 @@ package org.apache.spark.sql.types
 
 import java.util.Arrays
 
+import org.apache.spark.annotation.DeveloperApi
+
 /**
- *  A UTF-8 String, as internal representation of StringType in SparkSQL
+ * :: DeveloperApi ::
+ * A UTF-8 String, as internal representation of StringType in SparkSQL
  *
- *  A String encoded in UTF-8 as an Array[Byte], which can be used for comparison,
- *  search, see http://en.wikipedia.org/wiki/UTF-8 for details.
+ * A String encoded in UTF-8 as an Array[Byte], which can be used for comparison,
+ * search, see http://en.wikipedia.org/wiki/UTF-8 for details.
  *
- *  Note: This is not designed for general use cases, should not be used outside SQL.
+ * Note: This is not designed for general use cases, should not be used outside SQL.
  */
-
+@DeveloperApi
 final class UTF8String extends Ordered[UTF8String] with Serializable {
 
   private[this] var bytes: Array[Byte] = _
@@ -180,6 +183,10 @@ final class UTF8String extends Ordered[UTF8String] with Serializable {
   }
 }
 
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
 object UTF8String {
   // number of tailing bytes in a UTF8 sequence for a code point
   // see http://en.wikipedia.org/wiki/UTF-8, 192-256 of Byte 1

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
index 04fd261..5c4a152 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.mathfuncs._
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
deleted file mode 100644
index 18584c2..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.concurrent.locks.ReentrantReadWriteLock
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.columnar.InMemoryRelation
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
-
-/** Holds a cached logical plan and its data */
-private case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
-
-/**
- * Provides support in a SQLContext for caching query results and automatically using these cached
- * results when subsequent queries are executed.  Data is cached using byte buffers stored in an
- * InMemoryRelation.  This relation is automatically substituted query plans that return the
- * `sameResult` as the originally cached query.
- *
- * Internal to Spark SQL.
- */
-private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
-
-  @transient
-  private val cachedData = new scala.collection.mutable.ArrayBuffer[CachedData]
-
-  @transient
-  private val cacheLock = new ReentrantReadWriteLock
-
-  /** Returns true if the table is currently cached in-memory. */
-  def isCached(tableName: String): Boolean = lookupCachedData(sqlContext.table(tableName)).nonEmpty
-
-  /** Caches the specified table in-memory. */
-  def cacheTable(tableName: String): Unit = cacheQuery(sqlContext.table(tableName), Some(tableName))
-
-  /** Removes the specified table from the in-memory cache. */
-  def uncacheTable(tableName: String): Unit = uncacheQuery(sqlContext.table(tableName))
-
-  /** Acquires a read lock on the cache for the duration of `f`. */
-  private def readLock[A](f: => A): A = {
-    val lock = cacheLock.readLock()
-    lock.lock()
-    try f finally {
-      lock.unlock()
-    }
-  }
-
-  /** Acquires a write lock on the cache for the duration of `f`. */
-  private def writeLock[A](f: => A): A = {
-    val lock = cacheLock.writeLock()
-    lock.lock()
-    try f finally {
-      lock.unlock()
-    }
-  }
-
-  /** Clears all cached tables. */
-  private[sql] def clearCache(): Unit = writeLock {
-    cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist())
-    cachedData.clear()
-  }
-
-  /** Checks if the cache is empty. */
-  private[sql] def isEmpty: Boolean = readLock {
-    cachedData.isEmpty
-  }
-
-  /**
-   * Caches the data produced by the logical representation of the given schema rdd.  Unlike
-   * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
-   * the in-memory columnar representation of the underlying table is expensive.
-   */
-  private[sql] def cacheQuery(
-      query: DataFrame,
-      tableName: Option[String] = None,
-      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
-    val planToCache = query.queryExecution.analyzed
-    if (lookupCachedData(planToCache).nonEmpty) {
-      logWarning("Asked to cache already cached data.")
-    } else {
-      cachedData +=
-        CachedData(
-          planToCache,
-          InMemoryRelation(
-            sqlContext.conf.useCompression,
-            sqlContext.conf.columnBatchSize,
-            storageLevel,
-            query.queryExecution.executedPlan,
-            tableName))
-    }
-  }
-
-  /** Removes the data for the given [[DataFrame]] from the cache */
-  private[sql] def uncacheQuery(query: DataFrame, blocking: Boolean = true): Unit = writeLock {
-    val planToCache = query.queryExecution.analyzed
-    val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
-    require(dataIndex >= 0, s"Table $query is not cached.")
-    cachedData(dataIndex).cachedRepresentation.uncache(blocking)
-    cachedData.remove(dataIndex)
-  }
-
-  /** Tries to remove the data for the given [[DataFrame]] from the cache if it's cached */
-  private[sql] def tryUncacheQuery(
-      query: DataFrame,
-      blocking: Boolean = true): Boolean = writeLock {
-    val planToCache = query.queryExecution.analyzed
-    val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
-    val found = dataIndex >= 0
-    if (found) {
-      cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
-      cachedData.remove(dataIndex)
-    }
-    found
-  }
-
-  /** Optionally returns cached data for the given [[DataFrame]] */
-  private[sql] def lookupCachedData(query: DataFrame): Option[CachedData] = readLock {
-    lookupCachedData(query.queryExecution.analyzed)
-  }
-
-  /** Optionally returns cached data for the given LogicalPlan. */
-  private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
-    cachedData.find(cd => plan.sameResult(cd.plan))
-  }
-
-  /** Replaces segments of the given logical plan with cached versions where possible. */
-  private[sql] def useCachedData(plan: LogicalPlan): LogicalPlan = {
-    plan transformDown {
-      case currentFragment =>
-        lookupCachedData(currentFragment)
-          .map(_.cachedRepresentation.withOutput(currentFragment.output))
-          .getOrElse(currentFragment)
-    }
-  }
-
-  /**
-   * Invalidates the cache of any data that contains `plan`. Note that it is possible that this
-   * function will over invalidate.
-   */
-  private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock {
-    cachedData.foreach {
-      case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty =>
-        data.cachedRepresentation.recache()
-      case _ =>
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 42f5bcd..8bf1320 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -346,6 +346,7 @@ class Column(protected[sql] val expr: Expression) extends Logging {
    * }}}
    *
    * @group expr_ops
+   * @since 1.4.0
    */
   def when(condition: Column, value: Any):Column = this.expr match {
     case CaseWhen(branches: Seq[Expression]) =>
@@ -374,6 +375,7 @@ class Column(protected[sql] val expr: Expression) extends Logging {
    * }}}
    *
    * @group expr_ops
+   * @since 1.4.0
    */
   def otherwise(value: Any):Column = this.expr match {
     case CaseWhen(branches: Seq[Expression]) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala b/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala
deleted file mode 100644
index 1ec874f..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.beans.Introspector
-import java.lang.{Iterable => JIterable}
-import java.util.{Iterator => JIterator, Map => JMap}
-
-import scala.language.existentials
-
-import com.google.common.reflect.TypeToken
-
-import org.apache.spark.sql.types._
-
-
-/**
- * Type-inference utilities for POJOs and Java collections.
- */
-private [sql] object JavaTypeInference {
-
-  private val iterableType = TypeToken.of(classOf[JIterable[_]])
-  private val mapType = TypeToken.of(classOf[JMap[_, _]])
-  private val iteratorReturnType = classOf[JIterable[_]].getMethod("iterator").getGenericReturnType
-  private val nextReturnType = classOf[JIterator[_]].getMethod("next").getGenericReturnType
-  private val keySetReturnType = classOf[JMap[_, _]].getMethod("keySet").getGenericReturnType
-  private val valuesReturnType = classOf[JMap[_, _]].getMethod("values").getGenericReturnType
-
-  /**
-   * Infers the corresponding SQL data type of a Java type.
-   * @param typeToken Java type
-   * @return (SQL data type, nullable)
-   */
-  private [sql] def inferDataType(typeToken: TypeToken[_]): (DataType, Boolean) = {
-    // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
-    typeToken.getRawType match {
-      case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
-        (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)
-
-      case c: Class[_] if c == classOf[java.lang.String] => (StringType, true)
-      case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false)
-      case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false)
-      case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false)
-      case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false)
-      case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false)
-      case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false)
-      case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false)
-
-      case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true)
-      case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true)
-      case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true)
-      case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true)
-      case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true)
-      case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true)
-      case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true)
-
-      case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true)
-      case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
-      case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
-
-      case _ if typeToken.isArray =>
-        val (dataType, nullable) = inferDataType(typeToken.getComponentType)
-        (ArrayType(dataType, nullable), true)
-
-      case _ if iterableType.isAssignableFrom(typeToken) =>
-        val (dataType, nullable) = inferDataType(elementType(typeToken))
-        (ArrayType(dataType, nullable), true)
-
-      case _ if mapType.isAssignableFrom(typeToken) =>
-        val typeToken2 = typeToken.asInstanceOf[TypeToken[_ <: JMap[_, _]]]
-        val mapSupertype = typeToken2.getSupertype(classOf[JMap[_, _]])
-        val keyType = elementType(mapSupertype.resolveType(keySetReturnType))
-        val valueType = elementType(mapSupertype.resolveType(valuesReturnType))
-        val (keyDataType, _) = inferDataType(keyType)
-        val (valueDataType, nullable) = inferDataType(valueType)
-        (MapType(keyDataType, valueDataType, nullable), true)
-
-      case _ =>
-        val beanInfo = Introspector.getBeanInfo(typeToken.getRawType)
-        val properties = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
-        val fields = properties.map { property =>
-          val returnType = typeToken.method(property.getReadMethod).getReturnType
-          val (dataType, nullable) = inferDataType(returnType)
-          new StructField(property.getName, dataType, nullable)
-        }
-        (new StructType(fields), true)
-    }
-  }
-
-  private def elementType(typeToken: TypeToken[_]): TypeToken[_] = {
-    val typeToken2 = typeToken.asInstanceOf[TypeToken[_ <: JIterable[_]]]
-    val iterableSupertype = typeToken2.getSupertype(classOf[JIterable[_]])
-    val iteratorType = iterableSupertype.resolveType(iteratorReturnType)
-    val itemType = iteratorType.resolveType(nextReturnType)
-    itemType
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 0a148c7..521f3dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -33,6 +33,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.errors.DialectException
@@ -40,7 +41,6 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.catalyst.ParserDialect
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, expressions}
 import org.apache.spark.sql.execution.{Filter, _}
 import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
 import org.apache.spark.sql.json._
@@ -51,42 +51,6 @@ import org.apache.spark.util.Utils
 import org.apache.spark.{Partition, SparkContext}
 
 /**
- * Currently we support the default dialect named "sql", associated with the class
- * [[DefaultParserDialect]]
- *
- * And we can also provide custom SQL Dialect, for example in Spark SQL CLI:
- * {{{
- *-- switch to "hiveql" dialect
- *   spark-sql>SET spark.sql.dialect=hiveql;
- *   spark-sql>SELECT * FROM src LIMIT 1;
- *
- *-- switch to "sql" dialect
- *   spark-sql>SET spark.sql.dialect=sql;
- *   spark-sql>SELECT * FROM src LIMIT 1;
- *
- *-- register the new SQL dialect
- *   spark-sql> SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect;
- *   spark-sql> SELECT * FROM src LIMIT 1;
- *
- *-- register the non-exist SQL dialect
- *   spark-sql> SET spark.sql.dialect=NotExistedClass;
- *   spark-sql> SELECT * FROM src LIMIT 1;
- *
- *-- Exception will be thrown and switch to dialect
- *-- "sql" (for SQLContext) or
- *-- "hiveql" (for HiveContext)
- * }}}
- */
-private[spark] class DefaultParserDialect extends ParserDialect {
-  @transient
-  protected val sqlParser = new catalyst.SqlParser
-
-  override def parse(sqlText: String): LogicalPlan = {
-    sqlParser.parse(sqlText)
-  }
-}
-
-/**
  * The entry point for working with structured data (rows and columns) in Spark.  Allows the
  * creation of [[DataFrame]] objects as well as the execution of SQL queries.
  *
@@ -1276,7 +1240,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
       val projectSet = AttributeSet(projectList.flatMap(_.references))
       val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
       val filterCondition =
-        prunePushedDownFilters(filterPredicates).reduceLeftOption(expressions.And)
+        prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And)
 
       // Right now we still use a projection even if the only evaluation is applying an alias
       // to a column.  Since this is a no-op, it could be avoided. However, using this

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
new file mode 100644
index 0000000..5fcc48a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.columnar.InMemoryRelation
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
+
+/** Holds a cached logical plan and its data */
+private[sql] case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
+
+/**
+ * Provides support in a SQLContext for caching query results and automatically using these cached
+ * results when subsequent queries are executed.  Data is cached using byte buffers stored in an
+ * InMemoryRelation.  This relation is automatically substituted query plans that return the
+ * `sameResult` as the originally cached query.
+ *
+ * Internal to Spark SQL.
+ */
+private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
+
+  @transient
+  private val cachedData = new scala.collection.mutable.ArrayBuffer[CachedData]
+
+  @transient
+  private val cacheLock = new ReentrantReadWriteLock
+
+  /** Returns true if the table is currently cached in-memory. */
+  def isCached(tableName: String): Boolean = lookupCachedData(sqlContext.table(tableName)).nonEmpty
+
+  /** Caches the specified table in-memory. */
+  def cacheTable(tableName: String): Unit = cacheQuery(sqlContext.table(tableName), Some(tableName))
+
+  /** Removes the specified table from the in-memory cache. */
+  def uncacheTable(tableName: String): Unit = uncacheQuery(sqlContext.table(tableName))
+
+  /** Acquires a read lock on the cache for the duration of `f`. */
+  private def readLock[A](f: => A): A = {
+    val lock = cacheLock.readLock()
+    lock.lock()
+    try f finally {
+      lock.unlock()
+    }
+  }
+
+  /** Acquires a write lock on the cache for the duration of `f`. */
+  private def writeLock[A](f: => A): A = {
+    val lock = cacheLock.writeLock()
+    lock.lock()
+    try f finally {
+      lock.unlock()
+    }
+  }
+
+  /** Clears all cached tables. */
+  private[sql] def clearCache(): Unit = writeLock {
+    cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist())
+    cachedData.clear()
+  }
+
+  /** Checks if the cache is empty. */
+  private[sql] def isEmpty: Boolean = readLock {
+    cachedData.isEmpty
+  }
+
+  /**
+   * Caches the data produced by the logical representation of the given schema rdd.  Unlike
+   * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
+   * the in-memory columnar representation of the underlying table is expensive.
+   */
+  private[sql] def cacheQuery(
+      query: DataFrame,
+      tableName: Option[String] = None,
+      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
+    val planToCache = query.queryExecution.analyzed
+    if (lookupCachedData(planToCache).nonEmpty) {
+      logWarning("Asked to cache already cached data.")
+    } else {
+      cachedData +=
+        CachedData(
+          planToCache,
+          InMemoryRelation(
+            sqlContext.conf.useCompression,
+            sqlContext.conf.columnBatchSize,
+            storageLevel,
+            query.queryExecution.executedPlan,
+            tableName))
+    }
+  }
+
+  /** Removes the data for the given [[DataFrame]] from the cache */
+  private[sql] def uncacheQuery(query: DataFrame, blocking: Boolean = true): Unit = writeLock {
+    val planToCache = query.queryExecution.analyzed
+    val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
+    require(dataIndex >= 0, s"Table $query is not cached.")
+    cachedData(dataIndex).cachedRepresentation.uncache(blocking)
+    cachedData.remove(dataIndex)
+  }
+
+  /** Tries to remove the data for the given [[DataFrame]] from the cache if it's cached */
+  private[sql] def tryUncacheQuery(
+      query: DataFrame,
+      blocking: Boolean = true): Boolean = writeLock {
+    val planToCache = query.queryExecution.analyzed
+    val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
+    val found = dataIndex >= 0
+    if (found) {
+      cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
+      cachedData.remove(dataIndex)
+    }
+    found
+  }
+
+  /** Optionally returns cached data for the given [[DataFrame]] */
+  private[sql] def lookupCachedData(query: DataFrame): Option[CachedData] = readLock {
+    lookupCachedData(query.queryExecution.analyzed)
+  }
+
+  /** Optionally returns cached data for the given LogicalPlan. */
+  private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
+    cachedData.find(cd => plan.sameResult(cd.plan))
+  }
+
+  /** Replaces segments of the given logical plan with cached versions where possible. */
+  private[sql] def useCachedData(plan: LogicalPlan): LogicalPlan = {
+    plan transformDown {
+      case currentFragment =>
+        lookupCachedData(currentFragment)
+          .map(_.cachedRepresentation.withOutput(currentFragment.output))
+          .getOrElse(currentFragment)
+    }
+  }
+
+  /**
+   * Invalidates the cache of any data that contains `plan`. Note that it is possible that this
+   * function will over invalidate.
+   */
+  private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock {
+    cachedData.foreach {
+      case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty =>
+        data.cachedRepresentation.recache()
+      case _ =>
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 3dbc383..65dd7ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -19,20 +19,21 @@ package org.apache.spark.sql.execution
 
 import java.util.{List => JList, Map => JMap}
 
-import org.apache.spark.rdd.RDD
-
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
 import net.razorvine.pickle.{Pickler, Unpickler}
+
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.python.{PythonBroadcast, PythonRDD}
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.{Accumulator, Logging => SparkLogging}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 099e1d8..4404ad8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -438,6 +438,7 @@ object functions {
    * }}}
    *
    * @group normal_funcs
+   * @since 1.4.0
    */
   def when(condition: Column, value: Any): Column = {
     CaseWhen(Seq(condition.expr, lit(value).expr))

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index a03ade3..40483d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -25,9 +25,9 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.sources._
-import org.apache.spark.util.Utils
 
 private[sql] object JDBCRDD extends Logging {
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
index a8e69ae..8161151 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.core._
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.json.JacksonUtils.nextUntil
 import org.apache.spark.sql.types._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index f62973d..4c32710 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -29,6 +29,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.Logging
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index ec0e76c..8cdbe07 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -19,10 +19,10 @@ package org.apache.spark.sql
 
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.spark.sql.catalyst.DefaultParserDialect
 import org.apache.spark.sql.catalyst.errors.DialectException
 import org.apache.spark.sql.execution.GeneratedAggregate
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.catalyst.CatalystConf
 import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext.{udf => _, _}

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 263fafb..b06e338 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.JsonFactory
 import org.scalactic.Tolerance._
 
 import org.apache.spark.sql.TestData._
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.json.InferSchema.compatibleType
 import org.apache.spark.sql.sources.LogicalRelation

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 7c371db..008443d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -35,6 +35,7 @@ import parquet.schema.{MessageType, MessageTypeParser}
 
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext._
 import org.apache.spark.sql.test.TestSQLContext.implicits._

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 74ae984..7c7666f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.serde2.{io => hiveIo}
 import org.apache.hadoop.{io => hadoopIo}
 
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types
 import org.apache.spark.sql.types._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index b69312f..0b6f7a3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -35,7 +35,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.Logging
 import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.DateUtils
+import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.util.Utils
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e683182c/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 1d6393a..eaa9d6a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.sql.hive.execution
 
+import org.apache.spark.sql.catalyst.DefaultParserDialect
 import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.catalyst.errors.DialectException
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -26,7 +28,6 @@ import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation}
 import org.apache.spark.sql.parquet.FSBasedParquetRelation
 import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, DefaultParserDialect, QueryTest, Row, SQLConf}
 
 case class Nested1(f1: Nested2)
 case class Nested2(f2: Nested3)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org