You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/09/05 00:17:45 UTC
[4/4] spark git commit: [SPARK-10176] [SQL] Show partially analyzed
plans when checkAnswer fails to analyze
[SPARK-10176] [SQL] Show partially analyzed plans when checkAnswer fails to analyze
This PR takes over https://github.com/apache/spark/pull/8389.
This PR improves `checkAnswer` to print the partially analyzed plan in addition to the user friendly error message, in order to aid debugging failing tests.
In doing so, I ran into a conflict with the various ways that we bring a SQLContext into the tests. Depending on the trait we refer to the current context as `sqlContext`, `_sqlContext`, `ctx` or `hiveContext` with access modifiers `public`, `protected` and `private` depending on the defining class.
I propose we refactor as follows:
1. All tests should only refer to a `protected sqlContext` when testing general features, and `protected hiveContext` when it is a method that only exists on a `HiveContext`.
2. All tests should only import `testImplicits._` (i.e., don't import `TestHive.implicits._`)
Author: Wenchen Fan <cl...@outlook.com>
Closes #8584 from cloud-fan/cleanupTests.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3c0e431
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3c0e431
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3c0e431
Branch: refs/heads/master
Commit: c3c0e431a6280fbcf726ac9bc4db0e1b5a862be8
Parents: 804a012
Author: Wenchen Fan <cl...@outlook.com>
Authored: Fri Sep 4 15:17:37 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Sep 4 15:17:37 2015 -0700
----------------------------------------------------------------------
.../spark/sql/catalyst/plans/PlanTest.scala | 1 -
.../org/apache/spark/sql/CachedTableSuite.scala | 156 +++++++-------
.../spark/sql/ColumnExpressionSuite.scala | 16 +-
.../spark/sql/DataFrameAggregateSuite.scala | 4 +-
.../spark/sql/DataFrameComplexTypeSuite.scala | 6 +-
.../spark/sql/DataFrameImplicitsSuite.scala | 8 +-
.../apache/spark/sql/DataFrameStatSuite.scala | 10 +-
.../org/apache/spark/sql/DataFrameSuite.scala | 14 +-
.../spark/sql/DataFrameTungstenSuite.scala | 6 +-
.../apache/spark/sql/ExtraStrategiesSuite.scala | 2 +-
.../scala/org/apache/spark/sql/JoinSuite.scala | 12 +-
.../org/apache/spark/sql/ListTablesSuite.scala | 20 +-
.../scala/org/apache/spark/sql/QueryTest.scala | 27 ++-
.../scala/org/apache/spark/sql/RowSuite.scala | 2 +-
.../org/apache/spark/sql/SQLConfSuite.scala | 44 ++--
.../org/apache/spark/sql/SQLContextSuite.scala | 12 +-
.../org/apache/spark/sql/SQLQuerySuite.scala | 40 ++--
.../apache/spark/sql/SerializationSuite.scala | 2 +-
.../apache/spark/sql/StringFunctionsSuite.scala | 47 ++--
.../scala/org/apache/spark/sql/UDFSuite.scala | 42 ++--
.../apache/spark/sql/UserDefinedTypeSuite.scala | 6 +-
.../columnar/InMemoryColumnarQuerySuite.scala | 41 ++--
.../columnar/PartitionBatchPruningSuite.scala | 20 +-
.../spark/sql/execution/ExchangeSuite.scala | 2 +
.../spark/sql/execution/PlannerSuite.scala | 99 +++++----
.../execution/RowFormatConvertersSuite.scala | 16 +-
.../apache/spark/sql/execution/SortSuite.scala | 1 +
.../spark/sql/execution/SparkPlanTest.scala | 27 +--
.../spark/sql/execution/TungstenSortSuite.scala | 12 +-
.../TungstenAggregationIteratorSuite.scala | 2 +-
.../execution/datasources/json/JsonSuite.scala | 214 +++++++++----------
.../datasources/json/TestJsonData.scala | 34 +--
.../parquet/ParquetCompatibilityTest.scala | 5 +-
.../datasources/parquet/ParquetIOSuite.scala | 52 ++---
.../ParquetPartitionDiscoverySuite.scala | 2 +-
.../datasources/parquet/ParquetQuerySuite.scala | 42 ++--
.../datasources/parquet/ParquetTest.scala | 9 +-
.../execution/joins/BroadcastJoinSuite.scala | 10 +-
.../execution/joins/HashedRelationSuite.scala | 6 +-
.../sql/execution/joins/InnerJoinSuite.scala | 9 +-
.../sql/execution/joins/OuterJoinSuite.scala | 8 +-
.../sql/execution/joins/SemiJoinSuite.scala | 8 +-
.../sql/execution/metric/SQLMetricsSuite.scala | 24 +--
.../sql/execution/ui/SQLListenerSuite.scala | 8 +-
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 20 +-
.../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 52 +++--
.../sql/sources/CreateTableAsSelectSuite.scala | 2 -
.../spark/sql/sources/DataSourceTest.scala | 4 +-
.../apache/spark/sql/sources/InsertSuite.scala | 3 +-
.../sql/sources/PartitionedWriteSuite.scala | 8 +-
.../spark/sql/sources/SaveLoadSuite.scala | 1 -
.../org/apache/spark/sql/test/SQLTestData.scala | 52 ++---
.../apache/spark/sql/test/SQLTestUtils.scala | 41 ++--
.../spark/sql/test/SharedSQLContext.scala | 17 +-
.../apache/spark/sql/test/TestSQLContext.scala | 2 +-
.../apache/spark/sql/hive/test/TestHive.scala | 7 +-
.../spark/sql/hive/CachedTableSuite.scala | 20 +-
.../spark/sql/hive/ErrorPositionSuite.scala | 8 +-
.../sql/hive/HiveDataFrameAnalyticsSuite.scala | 13 +-
.../spark/sql/hive/HiveDataFrameJoinSuite.scala | 6 +-
.../sql/hive/HiveDataFrameWindowSuite.scala | 7 +-
.../sql/hive/HiveMetastoreCatalogSuite.scala | 20 +-
.../spark/sql/hive/HiveParquetSuite.scala | 12 +-
.../spark/sql/hive/HiveSparkSubmitSuite.scala | 11 +-
.../sql/hive/InsertIntoHiveTableSuite.scala | 35 ++-
.../apache/spark/sql/hive/ListTablesSuite.scala | 12 +-
.../sql/hive/MetastoreDataSourcesSuite.scala | 13 +-
.../spark/sql/hive/MultiDatabaseSuite.scala | 20 +-
.../hive/ParquetHiveCompatibilitySuite.scala | 24 +--
.../spark/sql/hive/QueryPartitionSuite.scala | 18 +-
.../apache/spark/sql/hive/StatisticsSuite.scala | 42 ++--
.../org/apache/spark/sql/hive/UDFSuite.scala | 16 +-
.../hive/execution/AggregationQuerySuite.scala | 19 +-
.../sql/hive/execution/HiveComparisonTest.scala | 4 +-
.../sql/hive/execution/HiveExplainSuite.scala | 11 +-
.../execution/HiveOperatorQueryableSuite.scala | 8 +-
.../spark/sql/hive/execution/HivePlanTest.scala | 8 +-
.../spark/sql/hive/execution/HiveUDFSuite.scala | 54 ++---
.../sql/hive/execution/SQLQuerySuite.scala | 39 ++--
.../execution/ScriptTransformationSuite.scala | 17 +-
.../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 7 +-
.../hive/orc/OrcPartitionDiscoverySuite.scala | 23 +-
.../spark/sql/hive/orc/OrcSourceSuite.scala | 10 +-
.../org/apache/spark/sql/hive/orc/OrcTest.scala | 9 +-
.../apache/spark/sql/hive/parquetSuites.scala | 16 +-
.../CommitFailureTestRelationSuite.scala | 9 +-
.../sql/sources/JsonHadoopFsRelationSuite.scala | 12 +-
.../sources/ParquetHadoopFsRelationSuite.scala | 15 +-
.../SimpleTextHadoopFsRelationSuite.scala | 4 +-
.../sql/sources/hadoopFsRelationSuites.scala | 28 ++-
90 files changed, 908 insertions(+), 999 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index 765c1e2..f76a903 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.util._
* Provides helper methods for comparing plans.
*/
class PlanTest extends SparkFunSuite {
-
/**
* Since attribute references are given globally unique ids during analysis,
* we must normalize them to check if two different queries are identical.
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index af7590c..3a3541a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -34,7 +34,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
import testImplicits._
def rddIdOf(tableName: String): Int = {
- val executedPlan = ctx.table(tableName).queryExecution.executedPlan
+ val executedPlan = sqlContext.table(tableName).queryExecution.executedPlan
executedPlan.collect {
case InMemoryColumnarTableScan(_, _, relation) =>
relation.cachedColumnBuffers.id
@@ -44,7 +44,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
}
def isMaterialized(rddId: Int): Boolean = {
- ctx.sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
+ sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
}
test("withColumn doesn't invalidate cached dataframe") {
@@ -69,41 +69,41 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
test("cache temp table") {
testData.select('key).registerTempTable("tempTable")
assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0)
- ctx.cacheTable("tempTable")
+ sqlContext.cacheTable("tempTable")
assertCached(sql("SELECT COUNT(*) FROM tempTable"))
- ctx.uncacheTable("tempTable")
+ sqlContext.uncacheTable("tempTable")
}
test("unpersist an uncached table will not raise exception") {
- assert(None == ctx.cacheManager.lookupCachedData(testData))
+ assert(None == sqlContext.cacheManager.lookupCachedData(testData))
testData.unpersist(blocking = true)
- assert(None == ctx.cacheManager.lookupCachedData(testData))
+ assert(None == sqlContext.cacheManager.lookupCachedData(testData))
testData.unpersist(blocking = false)
- assert(None == ctx.cacheManager.lookupCachedData(testData))
+ assert(None == sqlContext.cacheManager.lookupCachedData(testData))
testData.persist()
- assert(None != ctx.cacheManager.lookupCachedData(testData))
+ assert(None != sqlContext.cacheManager.lookupCachedData(testData))
testData.unpersist(blocking = true)
- assert(None == ctx.cacheManager.lookupCachedData(testData))
+ assert(None == sqlContext.cacheManager.lookupCachedData(testData))
testData.unpersist(blocking = false)
- assert(None == ctx.cacheManager.lookupCachedData(testData))
+ assert(None == sqlContext.cacheManager.lookupCachedData(testData))
}
test("cache table as select") {
sql("CACHE TABLE tempTable AS SELECT key FROM testData")
assertCached(sql("SELECT COUNT(*) FROM tempTable"))
- ctx.uncacheTable("tempTable")
+ sqlContext.uncacheTable("tempTable")
}
test("uncaching temp table") {
testData.select('key).registerTempTable("tempTable1")
testData.select('key).registerTempTable("tempTable2")
- ctx.cacheTable("tempTable1")
+ sqlContext.cacheTable("tempTable1")
assertCached(sql("SELECT COUNT(*) FROM tempTable1"))
assertCached(sql("SELECT COUNT(*) FROM tempTable2"))
// Is this valid?
- ctx.uncacheTable("tempTable2")
+ sqlContext.uncacheTable("tempTable2")
// Should this be cached?
assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0)
@@ -111,103 +111,103 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
test("too big for memory") {
val data = "*" * 1000
- ctx.sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).toDF()
+ sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).toDF()
.registerTempTable("bigData")
- ctx.table("bigData").persist(StorageLevel.MEMORY_AND_DISK)
- assert(ctx.table("bigData").count() === 200000L)
- ctx.table("bigData").unpersist(blocking = true)
+ sqlContext.table("bigData").persist(StorageLevel.MEMORY_AND_DISK)
+ assert(sqlContext.table("bigData").count() === 200000L)
+ sqlContext.table("bigData").unpersist(blocking = true)
}
test("calling .cache() should use in-memory columnar caching") {
- ctx.table("testData").cache()
- assertCached(ctx.table("testData"))
- ctx.table("testData").unpersist(blocking = true)
+ sqlContext.table("testData").cache()
+ assertCached(sqlContext.table("testData"))
+ sqlContext.table("testData").unpersist(blocking = true)
}
test("calling .unpersist() should drop in-memory columnar cache") {
- ctx.table("testData").cache()
- ctx.table("testData").count()
- ctx.table("testData").unpersist(blocking = true)
- assertCached(ctx.table("testData"), 0)
+ sqlContext.table("testData").cache()
+ sqlContext.table("testData").count()
+ sqlContext.table("testData").unpersist(blocking = true)
+ assertCached(sqlContext.table("testData"), 0)
}
test("isCached") {
- ctx.cacheTable("testData")
+ sqlContext.cacheTable("testData")
- assertCached(ctx.table("testData"))
- assert(ctx.table("testData").queryExecution.withCachedData match {
+ assertCached(sqlContext.table("testData"))
+ assert(sqlContext.table("testData").queryExecution.withCachedData match {
case _: InMemoryRelation => true
case _ => false
})
- ctx.uncacheTable("testData")
- assert(!ctx.isCached("testData"))
- assert(ctx.table("testData").queryExecution.withCachedData match {
+ sqlContext.uncacheTable("testData")
+ assert(!sqlContext.isCached("testData"))
+ assert(sqlContext.table("testData").queryExecution.withCachedData match {
case _: InMemoryRelation => false
case _ => true
})
}
test("SPARK-1669: cacheTable should be idempotent") {
- assume(!ctx.table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
+ assume(!sqlContext.table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
- ctx.cacheTable("testData")
- assertCached(ctx.table("testData"))
+ sqlContext.cacheTable("testData")
+ assertCached(sqlContext.table("testData"))
assertResult(1, "InMemoryRelation not found, testData should have been cached") {
- ctx.table("testData").queryExecution.withCachedData.collect {
+ sqlContext.table("testData").queryExecution.withCachedData.collect {
case r: InMemoryRelation => r
}.size
}
- ctx.cacheTable("testData")
+ sqlContext.cacheTable("testData")
assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") {
- ctx.table("testData").queryExecution.withCachedData.collect {
+ sqlContext.table("testData").queryExecution.withCachedData.collect {
case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan, _) => r
}.size
}
- ctx.uncacheTable("testData")
+ sqlContext.uncacheTable("testData")
}
test("read from cached table and uncache") {
- ctx.cacheTable("testData")
- checkAnswer(ctx.table("testData"), testData.collect().toSeq)
- assertCached(ctx.table("testData"))
+ sqlContext.cacheTable("testData")
+ checkAnswer(sqlContext.table("testData"), testData.collect().toSeq)
+ assertCached(sqlContext.table("testData"))
- ctx.uncacheTable("testData")
- checkAnswer(ctx.table("testData"), testData.collect().toSeq)
- assertCached(ctx.table("testData"), 0)
+ sqlContext.uncacheTable("testData")
+ checkAnswer(sqlContext.table("testData"), testData.collect().toSeq)
+ assertCached(sqlContext.table("testData"), 0)
}
test("correct error on uncache of non-cached table") {
intercept[IllegalArgumentException] {
- ctx.uncacheTable("testData")
+ sqlContext.uncacheTable("testData")
}
}
test("SELECT star from cached table") {
sql("SELECT * FROM testData").registerTempTable("selectStar")
- ctx.cacheTable("selectStar")
+ sqlContext.cacheTable("selectStar")
checkAnswer(
sql("SELECT * FROM selectStar WHERE key = 1"),
Seq(Row(1, "1")))
- ctx.uncacheTable("selectStar")
+ sqlContext.uncacheTable("selectStar")
}
test("Self-join cached") {
val unCachedAnswer =
sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key").collect()
- ctx.cacheTable("testData")
+ sqlContext.cacheTable("testData")
checkAnswer(
sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key"),
unCachedAnswer.toSeq)
- ctx.uncacheTable("testData")
+ sqlContext.uncacheTable("testData")
}
test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") {
sql("CACHE TABLE testData")
- assertCached(ctx.table("testData"))
+ assertCached(sqlContext.table("testData"))
val rddId = rddIdOf("testData")
assert(
@@ -215,7 +215,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
"Eagerly cached in-memory table should have already been materialized")
sql("UNCACHE TABLE testData")
- assert(!ctx.isCached("testData"), "Table 'testData' should not be cached")
+ assert(!sqlContext.isCached("testData"), "Table 'testData' should not be cached")
eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
@@ -224,14 +224,14 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
- assertCached(ctx.table("testCacheTable"))
+ assertCached(sqlContext.table("testCacheTable"))
val rddId = rddIdOf("testCacheTable")
assert(
isMaterialized(rddId),
"Eagerly cached in-memory table should have already been materialized")
- ctx.uncacheTable("testCacheTable")
+ sqlContext.uncacheTable("testCacheTable")
eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
@@ -239,14 +239,14 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
test("CACHE TABLE tableName AS SELECT ...") {
sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10")
- assertCached(ctx.table("testCacheTable"))
+ assertCached(sqlContext.table("testCacheTable"))
val rddId = rddIdOf("testCacheTable")
assert(
isMaterialized(rddId),
"Eagerly cached in-memory table should have already been materialized")
- ctx.uncacheTable("testCacheTable")
+ sqlContext.uncacheTable("testCacheTable")
eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
@@ -254,7 +254,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
test("CACHE LAZY TABLE tableName") {
sql("CACHE LAZY TABLE testData")
- assertCached(ctx.table("testData"))
+ assertCached(sqlContext.table("testData"))
val rddId = rddIdOf("testData")
assert(
@@ -266,7 +266,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
isMaterialized(rddId),
"Lazily cached in-memory table should have been materialized")
- ctx.uncacheTable("testData")
+ sqlContext.uncacheTable("testData")
eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
@@ -274,7 +274,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
test("InMemoryRelation statistics") {
sql("CACHE TABLE testData")
- ctx.table("testData").queryExecution.withCachedData.collect {
+ sqlContext.table("testData").queryExecution.withCachedData.collect {
case cached: InMemoryRelation =>
val actualSizeInBytes = (1 to 100).map(i => INT.defaultSize + i.toString.length + 4).sum
assert(cached.statistics.sizeInBytes === actualSizeInBytes)
@@ -283,46 +283,48 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
test("Drops temporary table") {
testData.select('key).registerTempTable("t1")
- ctx.table("t1")
- ctx.dropTempTable("t1")
- assert(intercept[RuntimeException](ctx.table("t1")).getMessage.startsWith("Table Not Found"))
+ sqlContext.table("t1")
+ sqlContext.dropTempTable("t1")
+ assert(
+ intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found"))
}
test("Drops cached temporary table") {
testData.select('key).registerTempTable("t1")
testData.select('key).registerTempTable("t2")
- ctx.cacheTable("t1")
+ sqlContext.cacheTable("t1")
- assert(ctx.isCached("t1"))
- assert(ctx.isCached("t2"))
+ assert(sqlContext.isCached("t1"))
+ assert(sqlContext.isCached("t2"))
- ctx.dropTempTable("t1")
- assert(intercept[RuntimeException](ctx.table("t1")).getMessage.startsWith("Table Not Found"))
- assert(!ctx.isCached("t2"))
+ sqlContext.dropTempTable("t1")
+ assert(
+ intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found"))
+ assert(!sqlContext.isCached("t2"))
}
test("Clear all cache") {
sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")
- ctx.cacheTable("t1")
- ctx.cacheTable("t2")
- ctx.clearCache()
- assert(ctx.cacheManager.isEmpty)
+ sqlContext.cacheTable("t1")
+ sqlContext.cacheTable("t2")
+ sqlContext.clearCache()
+ assert(sqlContext.cacheManager.isEmpty)
sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")
- ctx.cacheTable("t1")
- ctx.cacheTable("t2")
+ sqlContext.cacheTable("t1")
+ sqlContext.cacheTable("t2")
sql("Clear CACHE")
- assert(ctx.cacheManager.isEmpty)
+ assert(sqlContext.cacheManager.isEmpty)
}
test("Clear accumulators when uncacheTable to prevent memory leaking") {
sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")
- ctx.cacheTable("t1")
- ctx.cacheTable("t2")
+ sqlContext.cacheTable("t1")
+ sqlContext.cacheTable("t2")
sql("SELECT * FROM t1").count()
sql("SELECT * FROM t2").count()
@@ -331,8 +333,8 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
Accumulators.synchronized {
val accsSize = Accumulators.originals.size
- ctx.uncacheTable("t1")
- ctx.uncacheTable("t2")
+ sqlContext.uncacheTable("t1")
+ sqlContext.uncacheTable("t2")
assert((accsSize - 2) == Accumulators.originals.size)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 37738ec..4e988f0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -29,7 +29,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
import testImplicits._
private lazy val booleanData = {
- ctx.createDataFrame(ctx.sparkContext.parallelize(
+ sqlContext.createDataFrame(sparkContext.parallelize(
Row(false, false) ::
Row(false, true) ::
Row(true, false) ::
@@ -286,7 +286,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
}
test("isNaN") {
- val testData = ctx.createDataFrame(ctx.sparkContext.parallelize(
+ val testData = sqlContext.createDataFrame(sparkContext.parallelize(
Row(Double.NaN, Float.NaN) ::
Row(math.log(-1), math.log(-3).toFloat) ::
Row(null, null) ::
@@ -307,7 +307,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
}
test("nanvl") {
- val testData = ctx.createDataFrame(ctx.sparkContext.parallelize(
+ val testData = sqlContext.createDataFrame(sparkContext.parallelize(
Row(null, 3.0, Double.NaN, Double.PositiveInfinity, 1.0f, 4) :: Nil),
StructType(Seq(StructField("a", DoubleType), StructField("b", DoubleType),
StructField("c", DoubleType), StructField("d", DoubleType),
@@ -350,7 +350,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
}
test("!==") {
- val nullData = ctx.createDataFrame(ctx.sparkContext.parallelize(
+ val nullData = sqlContext.createDataFrame(sparkContext.parallelize(
Row(1, 1) ::
Row(1, 2) ::
Row(1, null) ::
@@ -411,7 +411,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
}
test("between") {
- val testData = ctx.sparkContext.parallelize(
+ val testData = sparkContext.parallelize(
(0, 1, 2) ::
(1, 2, 3) ::
(2, 1, 0) ::
@@ -556,7 +556,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
test("monotonicallyIncreasingId") {
// Make sure we have 2 partitions, each with 2 records.
- val df = ctx.sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ =>
+ val df = sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ =>
Iterator(Tuple1(1), Tuple1(2))
}.toDF("a")
checkAnswer(
@@ -567,7 +567,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
test("sparkPartitionId") {
// Make sure we have 2 partitions, each with 2 records.
- val df = ctx.sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ =>
+ val df = sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ =>
Iterator(Tuple1(1), Tuple1(2))
}.toDF("a")
checkAnswer(
@@ -578,7 +578,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
test("InputFileName") {
withTempPath { dir =>
- val data = sqlContext.sparkContext.parallelize(0 to 10).toDF("id")
+ val data = sparkContext.parallelize(0 to 10).toDF("id")
data.write.parquet(dir.getCanonicalPath)
val answer = sqlContext.read.parquet(dir.getCanonicalPath).select(inputFileName())
.head.getString(0)
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 72cf7aa..c0950b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -66,12 +66,12 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
Seq(Row(1, 3), Row(2, 3), Row(3, 3))
)
- ctx.conf.setConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS, false)
+ sqlContext.conf.setConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS, false)
checkAnswer(
testData2.groupBy("a").agg(sum($"b")),
Seq(Row(3), Row(3), Row(3))
)
- ctx.conf.setConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS, true)
+ sqlContext.conf.setConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS, true)
}
test("agg without groups") {
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala
index 3c359dd..09f7b50 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameComplexTypeSuite.scala
@@ -28,19 +28,19 @@ class DataFrameComplexTypeSuite extends QueryTest with SharedSQLContext {
test("UDF on struct") {
val f = udf((a: String) => a)
- val df = sqlContext.sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
+ val df = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
df.select(struct($"a").as("s")).select(f($"s.a")).collect()
}
test("UDF on named_struct") {
val f = udf((a: String) => a)
- val df = sqlContext.sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
+ val df = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
df.selectExpr("named_struct('a', a) s").select(f($"s.a")).collect()
}
test("UDF on array") {
val f = udf((a: String) => a)
- val df = sqlContext.sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
+ val df = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
df.select(array($"a").as("s")).select(f(expr("s[0]"))).collect()
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala
index e5d7d63..094efba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameImplicitsSuite.scala
@@ -24,7 +24,7 @@ class DataFrameImplicitsSuite extends QueryTest with SharedSQLContext {
test("RDD of tuples") {
checkAnswer(
- ctx.sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("intCol", "strCol"),
+ sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("intCol", "strCol"),
(1 to 10).map(i => Row(i, i.toString)))
}
@@ -36,19 +36,19 @@ class DataFrameImplicitsSuite extends QueryTest with SharedSQLContext {
test("RDD[Int]") {
checkAnswer(
- ctx.sparkContext.parallelize(1 to 10).toDF("intCol"),
+ sparkContext.parallelize(1 to 10).toDF("intCol"),
(1 to 10).map(i => Row(i)))
}
test("RDD[Long]") {
checkAnswer(
- ctx.sparkContext.parallelize(1L to 10L).toDF("longCol"),
+ sparkContext.parallelize(1L to 10L).toDF("longCol"),
(1L to 10L).map(i => Row(i)))
}
test("RDD[String]") {
checkAnswer(
- ctx.sparkContext.parallelize(1 to 10).map(_.toString).toDF("stringCol"),
+ sparkContext.parallelize(1 to 10).map(_.toString).toDF("stringCol"),
(1 to 10).map(i => Row(i.toString)))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index 28bdd6f..6524abc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
@@ -29,7 +29,7 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
test("sample with replacement") {
val n = 100
- val data = ctx.sparkContext.parallelize(1 to n, 2).toDF("id")
+ val data = sparkContext.parallelize(1 to n, 2).toDF("id")
checkAnswer(
data.sample(withReplacement = true, 0.05, seed = 13),
Seq(5, 10, 52, 73).map(Row(_))
@@ -38,7 +38,7 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
test("sample without replacement") {
val n = 100
- val data = ctx.sparkContext.parallelize(1 to n, 2).toDF("id")
+ val data = sparkContext.parallelize(1 to n, 2).toDF("id")
checkAnswer(
data.sample(withReplacement = false, 0.05, seed = 13),
Seq(16, 23, 88, 100).map(Row(_))
@@ -47,7 +47,7 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
test("randomSplit") {
val n = 600
- val data = ctx.sparkContext.parallelize(1 to n, 2).toDF("id")
+ val data = sparkContext.parallelize(1 to n, 2).toDF("id")
for (seed <- 1 to 5) {
val splits = data.randomSplit(Array[Double](1, 2, 3), seed)
assert(splits.length == 3, "wrong number of splits")
@@ -164,7 +164,7 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
}
test("Frequent Items 2") {
- val rows = ctx.sparkContext.parallelize(Seq.empty[Int], 4)
+ val rows = sparkContext.parallelize(Seq.empty[Int], 4)
// this is a regression test, where when merging partitions, we omitted values with higher
// counts than those that existed in the map when the map was full. This test should also fail
// if anything like SPARK-9614 is observed once again
@@ -182,7 +182,7 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
}
test("sampleBy") {
- val df = ctx.range(0, 100).select((col("id") % 3).as("key"))
+ val df = sqlContext.range(0, 100).select((col("id") % 3).as("key"))
val sampled = df.stat.sampleBy("key", Map(0 -> 0.1, 1 -> 0.2), 0L)
checkAnswer(
sampled.groupBy("key").count().orderBy("key"),
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index a4871e2..b5b9f11 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -345,7 +345,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("replace column using withColumn") {
- val df2 = sqlContext.sparkContext.parallelize(Array(1, 2, 3)).toDF("x")
+ val df2 = sparkContext.parallelize(Array(1, 2, 3)).toDF("x")
val df3 = df2.withColumn("x", df2("x") + 1)
checkAnswer(
df3.select("x"),
@@ -506,7 +506,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
test("showString: truncate = [true, false]") {
val longString = Array.fill(21)("1").mkString
- val df = sqlContext.sparkContext.parallelize(Seq("1", longString)).toDF()
+ val df = sparkContext.parallelize(Seq("1", longString)).toDF()
val expectedAnswerForFalse = """+---------------------+
||_1 |
|+---------------------+
@@ -596,7 +596,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("createDataFrame(RDD[Row], StructType) should convert UDTs (SPARK-6672)") {
- val rowRDD = sqlContext.sparkContext.parallelize(Seq(Row(new ExamplePoint(1.0, 2.0))))
+ val rowRDD = sparkContext.parallelize(Seq(Row(new ExamplePoint(1.0, 2.0))))
val schema = StructType(Array(StructField("point", new ExamplePointUDT(), false)))
val df = sqlContext.createDataFrame(rowRDD, schema)
df.rdd.collect()
@@ -619,14 +619,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("SPARK-7551: support backticks for DataFrame attribute resolution") {
- val df = sqlContext.read.json(sqlContext.sparkContext.makeRDD(
+ val df = sqlContext.read.json(sparkContext.makeRDD(
"""{"a.b": {"c": {"d..e": {"f": 1}}}}""" :: Nil))
checkAnswer(
df.select(df("`a.b`.c.`d..e`.`f`")),
Row(1)
)
- val df2 = sqlContext.read.json(sqlContext.sparkContext.makeRDD(
+ val df2 = sqlContext.read.json(sparkContext.makeRDD(
"""{"a b": {"c": {"d e": {"f": 1}}}}""" :: Nil))
checkAnswer(
df2.select(df2("`a b`.c.d e.f")),
@@ -646,7 +646,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("SPARK-7324 dropDuplicates") {
- val testData = sqlContext.sparkContext.parallelize(
+ val testData = sparkContext.parallelize(
(2, 1, 2) :: (1, 1, 1) ::
(1, 2, 1) :: (2, 1, 2) ::
(2, 2, 2) :: (2, 2, 1) ::
@@ -869,7 +869,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("SPARK-9323: DataFrame.orderBy should support nested column name") {
- val df = sqlContext.read.json(sqlContext.sparkContext.makeRDD(
+ val df = sqlContext.read.json(sparkContext.makeRDD(
"""{"a": {"b": 1}}""" :: Nil))
checkAnswer(df.orderBy("a.b"), Row(Row(1)))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala
index 77907e9..7ae12a7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTungstenSuite.scala
@@ -32,7 +32,7 @@ class DataFrameTungstenSuite extends QueryTest with SharedSQLContext {
test("test simple types") {
withSQLConf(SQLConf.UNSAFE_ENABLED.key -> "true") {
- val df = sqlContext.sparkContext.parallelize(Seq((1, 2))).toDF("a", "b")
+ val df = sparkContext.parallelize(Seq((1, 2))).toDF("a", "b")
assert(df.select(struct("a", "b")).first().getStruct(0) === Row(1, 2))
}
}
@@ -40,7 +40,7 @@ class DataFrameTungstenSuite extends QueryTest with SharedSQLContext {
test("test struct type") {
withSQLConf(SQLConf.UNSAFE_ENABLED.key -> "true") {
val struct = Row(1, 2L, 3.0F, 3.0)
- val data = sqlContext.sparkContext.parallelize(Seq(Row(1, struct)))
+ val data = sparkContext.parallelize(Seq(Row(1, struct)))
val schema = new StructType()
.add("a", IntegerType)
@@ -60,7 +60,7 @@ class DataFrameTungstenSuite extends QueryTest with SharedSQLContext {
withSQLConf(SQLConf.UNSAFE_ENABLED.key -> "true") {
val innerStruct = Row(1, "abcd")
val outerStruct = Row(1, 2L, 3.0F, 3.0, innerStruct, "efg")
- val data = sqlContext.sparkContext.parallelize(Seq(Row(1, outerStruct)))
+ val data = sparkContext.parallelize(Seq(Row(1, outerStruct)))
val schema = new StructType()
.add("a", IntegerType)
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala
index 8d2f45d..78a9879 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala
@@ -52,7 +52,7 @@ class ExtraStrategiesSuite extends QueryTest with SharedSQLContext {
try {
sqlContext.experimental.extraStrategies = TestStrategy :: Nil
- val df = sqlContext.sparkContext.parallelize(Seq(("so slow", 1))).toDF("a", "b")
+ val df = sparkContext.parallelize(Seq(("so slow", 1))).toDF("a", "b")
checkAnswer(
df.select("a"),
Row("so fast"))
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index f5c5046..b05435b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -31,7 +31,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
val x = testData2.as("x")
val y = testData2.as("y")
val join = x.join(y, $"x.a" === $"y.a", "inner").queryExecution.optimizedPlan
- val planned = ctx.planner.EquiJoinSelection(join)
+ val planned = sqlContext.planner.EquiJoinSelection(join)
assert(planned.size === 1)
}
@@ -59,7 +59,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
}
test("join operator selection") {
- ctx.cacheManager.clearCache()
+ sqlContext.cacheManager.clearCache()
Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[LeftSemiJoinHash]),
@@ -118,7 +118,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
}
test("broadcasted hash join operator selection") {
- ctx.cacheManager.clearCache()
+ sqlContext.cacheManager.clearCache()
sql("CACHE TABLE testData")
for (sortMergeJoinEnabled <- Seq(true, false)) {
withClue(s"sortMergeJoinEnabled=$sortMergeJoinEnabled") {
@@ -138,7 +138,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
}
test("broadcasted hash outer join operator selection") {
- ctx.cacheManager.clearCache()
+ sqlContext.cacheManager.clearCache()
sql("CACHE TABLE testData")
withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
Seq(
@@ -167,7 +167,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
val x = testData2.as("x")
val y = testData2.as("y")
val join = x.join(y, ($"x.a" === $"y.a") && ($"x.b" === $"y.b")).queryExecution.optimizedPlan
- val planned = ctx.planner.EquiJoinSelection(join)
+ val planned = sqlContext.planner.EquiJoinSelection(join)
assert(planned.size === 1)
}
@@ -442,7 +442,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
}
test("broadcasted left semi join operator selection") {
- ctx.cacheManager.clearCache()
+ sqlContext.cacheManager.clearCache()
sql("CACHE TABLE testData")
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1000000000") {
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
index babf883..eab0fbb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
@@ -32,33 +32,33 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
}
after {
- ctx.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
}
test("get all tables") {
checkAnswer(
- ctx.tables().filter("tableName = 'ListTablesSuiteTable'"),
+ sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
checkAnswer(
sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
- ctx.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
- assert(ctx.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
+ sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}
test("getting all Tables with a database name has no impact on returned table names") {
checkAnswer(
- ctx.tables("DB").filter("tableName = 'ListTablesSuiteTable'"),
+ sqlContext.tables("DB").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
checkAnswer(
sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
- ctx.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
- assert(ctx.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
+ sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}
test("query the returned DataFrame of tables") {
@@ -66,7 +66,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
StructField("tableName", StringType, false) ::
StructField("isTemporary", BooleanType, false) :: Nil)
- Seq(ctx.tables(), sql("SHOW TABLes")).foreach {
+ Seq(sqlContext.tables(), sql("SHOW TABLes")).foreach {
case tableDF =>
assert(expectedSchema === tableDF.schema)
@@ -77,9 +77,9 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
Row(true, "ListTablesSuiteTable")
)
checkAnswer(
- ctx.tables().filter("tableName = 'tables'").select("tableName", "isTemporary"),
+ sqlContext.tables().filter("tableName = 'tables'").select("tableName", "isTemporary"),
Row("tables", true))
- ctx.dropTempTable("tables")
+ sqlContext.dropTempTable("tables")
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 3649c2a..cada03e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -25,7 +25,9 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.columnar.InMemoryRelation
-class QueryTest extends PlanTest {
+abstract class QueryTest extends PlanTest {
+
+ protected def sqlContext: SQLContext
// Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*)
TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
@@ -56,18 +58,33 @@ class QueryTest extends PlanTest {
* @param df the [[DataFrame]] to be executed
* @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
*/
- protected def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Unit = {
- QueryTest.checkAnswer(df, expectedAnswer) match {
+ protected def checkAnswer(df: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
+ val analyzedDF = try df catch {
+ case ae: AnalysisException =>
+ val currentValue = sqlContext.conf.dataFrameEagerAnalysis
+ sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, false)
+ val partiallyAnalzyedPlan = df.queryExecution.analyzed
+ sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, currentValue)
+ fail(
+ s"""
+ |Failed to analyze query: $ae
+ |$partiallyAnalzyedPlan
+ |
+ |${stackTraceToString(ae)}
+ |""".stripMargin)
+ }
+
+ QueryTest.checkAnswer(analyzedDF, expectedAnswer) match {
case Some(errorMessage) => fail(errorMessage)
case None =>
}
}
- protected def checkAnswer(df: DataFrame, expectedAnswer: Row): Unit = {
+ protected def checkAnswer(df: => DataFrame, expectedAnswer: Row): Unit = {
checkAnswer(df, Seq(expectedAnswer))
}
- protected def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = {
+ protected def checkAnswer(df: => DataFrame, expectedAnswer: DataFrame): Unit = {
checkAnswer(df, expectedAnswer.collect())
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
index 77ccd6f..3ba14d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
@@ -57,7 +57,7 @@ class RowSuite extends SparkFunSuite with SharedSQLContext {
test("serialize w/ kryo") {
val row = Seq((1, Seq(1), Map(1 -> 1), BigDecimal(1))).toDF().first()
- val serializer = new SparkSqlSerializer(ctx.sparkContext.getConf)
+ val serializer = new SparkSqlSerializer(sparkContext.getConf)
val instance = serializer.newInstance()
val ser = instance.serialize(row)
val de = instance.deserialize(ser).asInstanceOf[Row]
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
index 7699ada..c35b31c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
@@ -27,58 +27,58 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
test("propagate from spark conf") {
// We create a new context here to avoid order dependence with other tests that might call
// clear().
- val newContext = new SQLContext(ctx.sparkContext)
+ val newContext = new SQLContext(sparkContext)
assert(newContext.getConf("spark.sql.testkey", "false") === "true")
}
test("programmatic ways of basic setting and getting") {
- ctx.conf.clear()
- assert(ctx.getAllConfs.size === 0)
+ sqlContext.conf.clear()
+ assert(sqlContext.getAllConfs.size === 0)
- ctx.setConf(testKey, testVal)
- assert(ctx.getConf(testKey) === testVal)
- assert(ctx.getConf(testKey, testVal + "_") === testVal)
- assert(ctx.getAllConfs.contains(testKey))
+ sqlContext.setConf(testKey, testVal)
+ assert(sqlContext.getConf(testKey) === testVal)
+ assert(sqlContext.getConf(testKey, testVal + "_") === testVal)
+ assert(sqlContext.getAllConfs.contains(testKey))
// Tests SQLConf as accessed from a SQLContext is mutable after
// the latter is initialized, unlike SparkConf inside a SparkContext.
- assert(ctx.getConf(testKey) == testVal)
- assert(ctx.getConf(testKey, testVal + "_") === testVal)
- assert(ctx.getAllConfs.contains(testKey))
+ assert(sqlContext.getConf(testKey) == testVal)
+ assert(sqlContext.getConf(testKey, testVal + "_") === testVal)
+ assert(sqlContext.getAllConfs.contains(testKey))
- ctx.conf.clear()
+ sqlContext.conf.clear()
}
test("parse SQL set commands") {
- ctx.conf.clear()
+ sqlContext.conf.clear()
sql(s"set $testKey=$testVal")
- assert(ctx.getConf(testKey, testVal + "_") === testVal)
- assert(ctx.getConf(testKey, testVal + "_") === testVal)
+ assert(sqlContext.getConf(testKey, testVal + "_") === testVal)
+ assert(sqlContext.getConf(testKey, testVal + "_") === testVal)
sql("set some.property=20")
- assert(ctx.getConf("some.property", "0") === "20")
+ assert(sqlContext.getConf("some.property", "0") === "20")
sql("set some.property = 40")
- assert(ctx.getConf("some.property", "0") === "40")
+ assert(sqlContext.getConf("some.property", "0") === "40")
val key = "spark.sql.key"
val vs = "val0,val_1,val2.3,my_table"
sql(s"set $key=$vs")
- assert(ctx.getConf(key, "0") === vs)
+ assert(sqlContext.getConf(key, "0") === vs)
sql(s"set $key=")
- assert(ctx.getConf(key, "0") === "")
+ assert(sqlContext.getConf(key, "0") === "")
- ctx.conf.clear()
+ sqlContext.conf.clear()
}
test("deprecated property") {
- ctx.conf.clear()
+ sqlContext.conf.clear()
sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
- assert(ctx.conf.numShufflePartitions === 10)
+ assert(sqlContext.conf.numShufflePartitions === 10)
}
test("invalid conf value") {
- ctx.conf.clear()
+ sqlContext.conf.clear()
val e = intercept[IllegalArgumentException] {
sql(s"set ${SQLConf.CASE_SENSITIVE.key}=10")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index 007be12..dd88ae3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -24,7 +24,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSQLContext {
override def afterAll(): Unit = {
try {
- SQLContext.setLastInstantiatedContext(ctx)
+ SQLContext.setLastInstantiatedContext(sqlContext)
} finally {
super.afterAll()
}
@@ -32,18 +32,18 @@ class SQLContextSuite extends SparkFunSuite with SharedSQLContext {
test("getOrCreate instantiates SQLContext") {
SQLContext.clearLastInstantiatedContext()
- val sqlContext = SQLContext.getOrCreate(ctx.sparkContext)
+ val sqlContext = SQLContext.getOrCreate(sparkContext)
assert(sqlContext != null, "SQLContext.getOrCreate returned null")
- assert(SQLContext.getOrCreate(ctx.sparkContext).eq(sqlContext),
+ assert(SQLContext.getOrCreate(sparkContext).eq(sqlContext),
"SQLContext created by SQLContext.getOrCreate not returned by SQLContext.getOrCreate")
}
test("getOrCreate gets last explicitly instantiated SQLContext") {
SQLContext.clearLastInstantiatedContext()
- val sqlContext = new SQLContext(ctx.sparkContext)
- assert(SQLContext.getOrCreate(ctx.sparkContext) != null,
+ val sqlContext = new SQLContext(sparkContext)
+ assert(SQLContext.getOrCreate(sparkContext) != null,
"SQLContext.getOrCreate after explicitly created SQLContext returned null")
- assert(SQLContext.getOrCreate(ctx.sparkContext).eq(sqlContext),
+ assert(SQLContext.getOrCreate(sparkContext).eq(sqlContext),
"SQLContext.getOrCreate after explicitly created SQLContext did not return the context")
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 0ef25fe..05f2000 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -147,14 +147,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SQL Dialect Switching to a new SQL parser") {
- val newContext = new SQLContext(sqlContext.sparkContext)
+ val newContext = new SQLContext(sparkContext)
newContext.setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName())
assert(newContext.getSQLDialect().getClass === classOf[MyDialect])
assert(newContext.sql("SELECT 1").collect() === Array(Row(1)))
}
test("SQL Dialect Switch to an invalid parser with alias") {
- val newContext = new SQLContext(sqlContext.sparkContext)
+ val newContext = new SQLContext(sparkContext)
newContext.sql("SET spark.sql.dialect=MyTestClass")
intercept[DialectException] {
newContext.sql("SELECT 1")
@@ -196,7 +196,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("grouping on nested fields") {
- sqlContext.read.json(sqlContext.sparkContext.parallelize(
+ sqlContext.read.json(sparkContext.parallelize(
"""{"nested": {"attribute": 1}, "value": 2}""" :: Nil))
.registerTempTable("rows")
@@ -215,7 +215,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("SPARK-6201 IN type conversion") {
sqlContext.read.json(
- sqlContext.sparkContext.parallelize(
+ sparkContext.parallelize(
Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}")))
.registerTempTable("d")
@@ -1342,7 +1342,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-3483 Special chars in column names") {
- val data = sqlContext.sparkContext.parallelize(
+ val data = sparkContext.parallelize(
Seq("""{"key?number1": "value1", "key.number2": "value2"}"""))
sqlContext.read.json(data).registerTempTable("records")
sql("SELECT `key?number1`, `key.number2` FROM records")
@@ -1385,13 +1385,13 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-4322 Grouping field with struct field as sub expression") {
- sqlContext.read.json(sqlContext.sparkContext.makeRDD("""{"a": {"b": [{"c": 1}]}}""" :: Nil))
+ sqlContext.read.json(sparkContext.makeRDD("""{"a": {"b": [{"c": 1}]}}""" :: Nil))
.registerTempTable("data")
checkAnswer(sql("SELECT a.b[0].c FROM data GROUP BY a.b[0].c"), Row(1))
sqlContext.dropTempTable("data")
sqlContext.read.json(
- sqlContext.sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).registerTempTable("data")
+ sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).registerTempTable("data")
checkAnswer(sql("SELECT a.b + 1 FROM data GROUP BY a.b + 1"), Row(2))
sqlContext.dropTempTable("data")
}
@@ -1412,10 +1412,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("Supporting relational operator '<=>' in Spark SQL") {
val nullCheckData1 = TestData(1, "1") :: TestData(2, null) :: Nil
- val rdd1 = sqlContext.sparkContext.parallelize((0 to 1).map(i => nullCheckData1(i)))
+ val rdd1 = sparkContext.parallelize((0 to 1).map(i => nullCheckData1(i)))
rdd1.toDF().registerTempTable("nulldata1")
val nullCheckData2 = TestData(1, "1") :: TestData(2, null) :: Nil
- val rdd2 = sqlContext.sparkContext.parallelize((0 to 1).map(i => nullCheckData2(i)))
+ val rdd2 = sparkContext.parallelize((0 to 1).map(i => nullCheckData2(i)))
rdd2.toDF().registerTempTable("nulldata2")
checkAnswer(sql("SELECT nulldata1.key FROM nulldata1 join " +
"nulldata2 on nulldata1.value <=> nulldata2.value"),
@@ -1424,7 +1424,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("Multi-column COUNT(DISTINCT ...)") {
val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil
- val rdd = sqlContext.sparkContext.parallelize((0 to 1).map(i => data(i)))
+ val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
rdd.toDF().registerTempTable("distinctData")
checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), Row(2))
}
@@ -1432,14 +1432,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("SPARK-4699 case sensitivity SQL query") {
sqlContext.setConf(SQLConf.CASE_SENSITIVE, false)
val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil
- val rdd = sqlContext.sparkContext.parallelize((0 to 1).map(i => data(i)))
+ val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
rdd.toDF().registerTempTable("testTable1")
checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1"))
sqlContext.setConf(SQLConf.CASE_SENSITIVE, true)
}
test("SPARK-6145: ORDER BY test for nested fields") {
- sqlContext.read.json(sqlContext.sparkContext.makeRDD(
+ sqlContext.read.json(sparkContext.makeRDD(
"""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil))
.registerTempTable("nestedOrder")
@@ -1452,14 +1452,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-6145: special cases") {
- sqlContext.read.json(sqlContext.sparkContext.makeRDD(
+ sqlContext.read.json(sparkContext.makeRDD(
"""{"a": {"b": [1]}, "b": [{"a": 1}], "_c0": {"a": 1}}""" :: Nil)).registerTempTable("t")
checkAnswer(sql("SELECT a.b[0] FROM t ORDER BY _c0.a"), Row(1))
checkAnswer(sql("SELECT b[0].a FROM t ORDER BY _c0.a"), Row(1))
}
test("SPARK-6898: complete support for special chars in column names") {
- sqlContext.read.json(sqlContext.sparkContext.makeRDD(
+ sqlContext.read.json(sparkContext.makeRDD(
"""{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""" :: Nil))
.registerTempTable("t")
@@ -1543,7 +1543,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("SPARK-7067: order by queries for complex ExtractValue chain") {
withTempTable("t") {
- sqlContext.read.json(sqlContext.sparkContext.makeRDD(
+ sqlContext.read.json(sparkContext.makeRDD(
"""{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""" :: Nil)).registerTempTable("t")
checkAnswer(sql("SELECT a.b FROM t ORDER BY b[0].d"), Row(Seq(Row(1))))
}
@@ -1610,8 +1610,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("aggregation with codegen updates peak execution memory") {
withSQLConf((SQLConf.CODEGEN_ENABLED.key, "true")) {
- val sc = sqlContext.sparkContext
- AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "aggregation with codegen") {
+ AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "aggregation with codegen") {
testCodeGen(
"SELECT key, count(value) FROM testData GROUP BY key",
(1 to 100).map(i => Row(i, 1)))
@@ -1670,8 +1669,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("external sorting updates peak execution memory") {
withSQLConf((SQLConf.EXTERNAL_SORT.key, "true")) {
- val sc = sqlContext.sparkContext
- AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external sort") {
+ AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
sortTest()
}
}
@@ -1679,7 +1677,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("SPARK-9511: error with table starting with number") {
withTempTable("1one") {
- sqlContext.sparkContext.parallelize(1 to 10).map(i => (i, i.toString))
+ sparkContext.parallelize(1 to 10).map(i => (i, i.toString))
.toDF("num", "str")
.registerTempTable("1one")
checkAnswer(sql("select count(num) from 1one"), Row(10))
@@ -1690,7 +1688,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df =
- sqlContext.sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
+ sparkContext.parallelize(1 to 10).map(i => (i, i.toString)).toDF("num", "str")
df
.write
.format("parquet")
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/SerializationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SerializationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SerializationSuite.scala
index 45d0ee4..ddab918 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SerializationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SerializationSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.test.SharedSQLContext
class SerializationSuite extends SparkFunSuite with SharedSQLContext {
test("[SPARK-5235] SQLContext should be serializable") {
- val _sqlContext = new SQLContext(sqlContext.sparkContext)
+ val _sqlContext = new SQLContext(sparkContext)
new JavaSerializer(new SparkConf()).newInstance().serialize(_sqlContext)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
index b91438b..e12e6be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StringFunctionsSuite.scala
@@ -268,9 +268,7 @@ class StringFunctionsSuite extends QueryTest with SharedSQLContext {
Row(3, 4))
intercept[AnalysisException] {
- checkAnswer(
- df.selectExpr("length(c)"), // int type of the argument is unacceptable
- Row("5.0000"))
+ df.selectExpr("length(c)") // int type of the argument is unacceptable
}
}
@@ -284,63 +282,46 @@ class StringFunctionsSuite extends QueryTest with SharedSQLContext {
}
test("number format function") {
- val tuple =
- ("aa", 1.asInstanceOf[Byte], 2.asInstanceOf[Short],
- 3.13223f, 4, 5L, 6.48173d, Decimal(7.128381))
- val df =
- Seq(tuple)
- .toDF(
- "a", // string "aa"
- "b", // byte 1
- "c", // short 2
- "d", // float 3.13223f
- "e", // integer 4
- "f", // long 5L
- "g", // double 6.48173d
- "h") // decimal 7.128381
-
- checkAnswer(
- df.select(format_number($"f", 4)),
+ val df = sqlContext.range(1)
+
+ checkAnswer(
+ df.select(format_number(lit(5L), 4)),
Row("5.0000"))
checkAnswer(
- df.selectExpr("format_number(b, e)"), // convert the 1st argument to integer
+ df.select(format_number(lit(1.toByte), 4)), // convert the 1st argument to integer
Row("1.0000"))
checkAnswer(
- df.selectExpr("format_number(c, e)"), // convert the 1st argument to integer
+ df.select(format_number(lit(2.toShort), 4)), // convert the 1st argument to integer
Row("2.0000"))
checkAnswer(
- df.selectExpr("format_number(d, e)"), // convert the 1st argument to double
+ df.select(format_number(lit(3.1322.toFloat), 4)), // convert the 1st argument to double
Row("3.1322"))
checkAnswer(
- df.selectExpr("format_number(e, e)"), // not convert anything
+ df.select(format_number(lit(4), 4)), // not convert anything
Row("4.0000"))
checkAnswer(
- df.selectExpr("format_number(f, e)"), // not convert anything
+ df.select(format_number(lit(5L), 4)), // not convert anything
Row("5.0000"))
checkAnswer(
- df.selectExpr("format_number(g, e)"), // not convert anything
+ df.select(format_number(lit(6.48173), 4)), // not convert anything
Row("6.4817"))
checkAnswer(
- df.selectExpr("format_number(h, e)"), // not convert anything
+ df.select(format_number(lit(BigDecimal(7.128381)), 4)), // not convert anything
Row("7.1284"))
intercept[AnalysisException] {
- checkAnswer(
- df.selectExpr("format_number(a, e)"), // string type of the 1st argument is unacceptable
- Row("5.0000"))
+ df.select(format_number(lit("aa"), 4)) // string type of the 1st argument is unacceptable
}
intercept[AnalysisException] {
- checkAnswer(
- df.selectExpr("format_number(e, g)"), // decimal type of the 2nd argument is unacceptable
- Row("5.0000"))
+ df.selectExpr("format_number(4, 6.48173)") // non-integral type 2nd argument is unacceptable
}
// for testing the mutable state of the expression in code gen.
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index eb275af..e0435a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -26,7 +26,7 @@ class UDFSuite extends QueryTest with SharedSQLContext {
import testImplicits._
test("built-in fixed arity expressions") {
- val df = ctx.emptyDataFrame
+ val df = sqlContext.emptyDataFrame
df.selectExpr("rand()", "randn()", "rand(5)", "randn(50)")
}
@@ -55,23 +55,23 @@ class UDFSuite extends QueryTest with SharedSQLContext {
val df = Seq((1, "Tearing down the walls that divide us")).toDF("id", "saying")
df.registerTempTable("tmp_table")
checkAnswer(sql("select spark_partition_id() from tmp_table").toDF(), Row(0))
- ctx.dropTempTable("tmp_table")
+ sqlContext.dropTempTable("tmp_table")
}
test("SPARK-8005 input_file_name") {
withTempPath { dir =>
- val data = ctx.sparkContext.parallelize(0 to 10, 2).toDF("id")
+ val data = sparkContext.parallelize(0 to 10, 2).toDF("id")
data.write.parquet(dir.getCanonicalPath)
- ctx.read.parquet(dir.getCanonicalPath).registerTempTable("test_table")
+ sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("test_table")
val answer = sql("select input_file_name() from test_table").head().getString(0)
assert(answer.contains(dir.getCanonicalPath))
assert(sql("select input_file_name() from test_table").distinct().collect().length >= 2)
- ctx.dropTempTable("test_table")
+ sqlContext.dropTempTable("test_table")
}
}
test("error reporting for incorrect number of arguments") {
- val df = ctx.emptyDataFrame
+ val df = sqlContext.emptyDataFrame
val e = intercept[AnalysisException] {
df.selectExpr("substr('abcd', 2, 3, 4)")
}
@@ -79,7 +79,7 @@ class UDFSuite extends QueryTest with SharedSQLContext {
}
test("error reporting for undefined functions") {
- val df = ctx.emptyDataFrame
+ val df = sqlContext.emptyDataFrame
val e = intercept[AnalysisException] {
df.selectExpr("a_function_that_does_not_exist()")
}
@@ -87,24 +87,24 @@ class UDFSuite extends QueryTest with SharedSQLContext {
}
test("Simple UDF") {
- ctx.udf.register("strLenScala", (_: String).length)
+ sqlContext.udf.register("strLenScala", (_: String).length)
assert(sql("SELECT strLenScala('test')").head().getInt(0) === 4)
}
test("ZeroArgument UDF") {
- ctx.udf.register("random0", () => { Math.random()})
+ sqlContext.udf.register("random0", () => { Math.random()})
assert(sql("SELECT random0()").head().getDouble(0) >= 0.0)
}
test("TwoArgument UDF") {
- ctx.udf.register("strLenScala", (_: String).length + (_: Int))
+ sqlContext.udf.register("strLenScala", (_: String).length + (_: Int))
assert(sql("SELECT strLenScala('test', 1)").head().getInt(0) === 5)
}
test("UDF in a WHERE") {
- ctx.udf.register("oneArgFilter", (n: Int) => { n > 80 })
+ sqlContext.udf.register("oneArgFilter", (n: Int) => { n > 80 })
- val df = ctx.sparkContext.parallelize(
+ val df = sparkContext.parallelize(
(1 to 100).map(i => TestData(i, i.toString))).toDF()
df.registerTempTable("integerData")
@@ -114,7 +114,7 @@ class UDFSuite extends QueryTest with SharedSQLContext {
}
test("UDF in a HAVING") {
- ctx.udf.register("havingFilter", (n: Long) => { n > 5 })
+ sqlContext.udf.register("havingFilter", (n: Long) => { n > 5 })
val df = Seq(("red", 1), ("red", 2), ("blue", 10),
("green", 100), ("green", 200)).toDF("g", "v")
@@ -133,7 +133,7 @@ class UDFSuite extends QueryTest with SharedSQLContext {
}
test("UDF in a GROUP BY") {
- ctx.udf.register("groupFunction", (n: Int) => { n > 10 })
+ sqlContext.udf.register("groupFunction", (n: Int) => { n > 10 })
val df = Seq(("red", 1), ("red", 2), ("blue", 10),
("green", 100), ("green", 200)).toDF("g", "v")
@@ -150,10 +150,10 @@ class UDFSuite extends QueryTest with SharedSQLContext {
}
test("UDFs everywhere") {
- ctx.udf.register("groupFunction", (n: Int) => { n > 10 })
- ctx.udf.register("havingFilter", (n: Long) => { n > 2000 })
- ctx.udf.register("whereFilter", (n: Int) => { n < 150 })
- ctx.udf.register("timesHundred", (n: Long) => { n * 100 })
+ sqlContext.udf.register("groupFunction", (n: Int) => { n > 10 })
+ sqlContext.udf.register("havingFilter", (n: Long) => { n > 2000 })
+ sqlContext.udf.register("whereFilter", (n: Int) => { n < 150 })
+ sqlContext.udf.register("timesHundred", (n: Long) => { n * 100 })
val df = Seq(("red", 1), ("red", 2), ("blue", 10),
("green", 100), ("green", 200)).toDF("g", "v")
@@ -172,7 +172,7 @@ class UDFSuite extends QueryTest with SharedSQLContext {
}
test("struct UDF") {
- ctx.udf.register("returnStruct", (f1: String, f2: String) => FunctionResult(f1, f2))
+ sqlContext.udf.register("returnStruct", (f1: String, f2: String) => FunctionResult(f1, f2))
val result =
sql("SELECT returnStruct('test', 'test2') as ret")
@@ -181,13 +181,13 @@ class UDFSuite extends QueryTest with SharedSQLContext {
}
test("udf that is transformed") {
- ctx.udf.register("makeStruct", (x: Int, y: Int) => (x, y))
+ sqlContext.udf.register("makeStruct", (x: Int, y: Int) => (x, y))
// 1 + 1 is constant folded causing a transformation.
assert(sql("SELECT makeStruct(1 + 1, 2)").first().getAs[Row](0) === Row(2, 2))
}
test("type coercion for udf inputs") {
- ctx.udf.register("intExpected", (x: Int) => x)
+ sqlContext.udf.register("intExpected", (x: Int) => x)
// pass a decimal to intExpected.
assert(sql("SELECT intExpected(1.0)").head().getInt(0) === 1)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index b6d279a..fa8f9c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -90,7 +90,7 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext {
}
test("UDTs and UDFs") {
- ctx.udf.register("testType", (d: MyDenseVector) => d.isInstanceOf[MyDenseVector])
+ sqlContext.udf.register("testType", (d: MyDenseVector) => d.isInstanceOf[MyDenseVector])
pointsRDD.registerTempTable("points")
checkAnswer(
sql("SELECT testType(features) from points"),
@@ -148,8 +148,8 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext {
StructField("vec", new MyDenseVectorUDT, false)
))
- val stringRDD = ctx.sparkContext.parallelize(data)
- val jsonRDD = ctx.read.schema(schema).json(stringRDD)
+ val stringRDD = sparkContext.parallelize(data)
+ val jsonRDD = sqlContext.read.schema(schema).json(stringRDD)
checkAnswer(
jsonRDD,
Row(1, new MyDenseVector(Array(1.1, 2.2, 3.3, 4.4))) ::
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index 83db9b6..cd3644e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -31,7 +31,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
setupTestData()
test("simple columnar query") {
- val plan = ctx.executePlan(testData.logicalPlan).executedPlan
+ val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
checkAnswer(scan, testData.collect().toSeq)
@@ -39,16 +39,16 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
test("default size avoids broadcast") {
// TODO: Improve this test when we have better statistics
- ctx.sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString))
+ sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString))
.toDF().registerTempTable("sizeTst")
- ctx.cacheTable("sizeTst")
+ sqlContext.cacheTable("sizeTst")
assert(
- ctx.table("sizeTst").queryExecution.analyzed.statistics.sizeInBytes >
- ctx.conf.autoBroadcastJoinThreshold)
+ sqlContext.table("sizeTst").queryExecution.analyzed.statistics.sizeInBytes >
+ sqlContext.conf.autoBroadcastJoinThreshold)
}
test("projection") {
- val plan = ctx.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
+ val plan = sqlContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
checkAnswer(scan, testData.collect().map {
@@ -57,7 +57,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
- val plan = ctx.executePlan(testData.logicalPlan).executedPlan
+ val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
checkAnswer(scan, testData.collect().toSeq)
@@ -69,7 +69,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
sql("SELECT * FROM repeatedData"),
repeatedData.collect().toSeq.map(Row.fromTuple))
- ctx.cacheTable("repeatedData")
+ sqlContext.cacheTable("repeatedData")
checkAnswer(
sql("SELECT * FROM repeatedData"),
@@ -81,7 +81,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
sql("SELECT * FROM nullableRepeatedData"),
nullableRepeatedData.collect().toSeq.map(Row.fromTuple))
- ctx.cacheTable("nullableRepeatedData")
+ sqlContext.cacheTable("nullableRepeatedData")
checkAnswer(
sql("SELECT * FROM nullableRepeatedData"),
@@ -96,7 +96,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
sql("SELECT time FROM timestamps"),
timestamps.collect().toSeq)
- ctx.cacheTable("timestamps")
+ sqlContext.cacheTable("timestamps")
checkAnswer(
sql("SELECT time FROM timestamps"),
@@ -108,7 +108,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
sql("SELECT * FROM withEmptyParts"),
withEmptyParts.collect().toSeq.map(Row.fromTuple))
- ctx.cacheTable("withEmptyParts")
+ sqlContext.cacheTable("withEmptyParts")
checkAnswer(
sql("SELECT * FROM withEmptyParts"),
@@ -157,7 +157,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
// Create a RDD for the schema
val rdd =
- ctx.sparkContext.parallelize((1 to 100), 10).map { i =>
+ sparkContext.parallelize((1 to 100), 10).map { i =>
Row(
s"str${i}: test cache.",
s"binary${i}: test cache.".getBytes("UTF-8"),
@@ -177,24 +177,24 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
(0 to i).map(j => s"map_key_$j" -> (Long.MaxValue - j)).toMap,
Row((i - 0.25).toFloat, Seq(true, false, null)))
}
- ctx.createDataFrame(rdd, schema).registerTempTable("InMemoryCache_different_data_types")
+ sqlContext.createDataFrame(rdd, schema).registerTempTable("InMemoryCache_different_data_types")
// Cache the table.
sql("cache table InMemoryCache_different_data_types")
// Make sure the table is indeed cached.
- val tableScan = ctx.table("InMemoryCache_different_data_types").queryExecution.executedPlan
+ sqlContext.table("InMemoryCache_different_data_types").queryExecution.executedPlan
assert(
- ctx.isCached("InMemoryCache_different_data_types"),
+ sqlContext.isCached("InMemoryCache_different_data_types"),
"InMemoryCache_different_data_types should be cached.")
// Issue a query and check the results.
checkAnswer(
sql(s"SELECT DISTINCT ${allColumns} FROM InMemoryCache_different_data_types"),
- ctx.table("InMemoryCache_different_data_types").collect())
- ctx.dropTempTable("InMemoryCache_different_data_types")
+ sqlContext.table("InMemoryCache_different_data_types").collect())
+ sqlContext.dropTempTable("InMemoryCache_different_data_types")
}
test("SPARK-10422: String column in InMemoryColumnarCache needs to override clone method") {
- val df =
- ctx.range(1, 100).selectExpr("id % 10 as id").rdd.map(id => Tuple1(s"str_$id")).toDF("i")
+ val df = sqlContext.range(1, 100).selectExpr("id % 10 as id")
+ .rdd.map(id => Tuple1(s"str_$id")).toDF("i")
val cached = df.cache()
// count triggers the caching action. It should not throw.
cached.count()
@@ -205,7 +205,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
// Check result.
checkAnswer(
cached,
- ctx.range(1, 100).selectExpr("id % 10 as id").rdd.map(id => Tuple1(s"str_$id")).toDF("i")
+ sqlContext.range(1, 100).selectExpr("id % 10 as id")
+ .rdd.map(id => Tuple1(s"str_$id")).toDF("i")
)
// Drop the cache.
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
index ab2644e..6b74014 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
@@ -25,32 +25,32 @@ import org.apache.spark.sql.test.SQLTestData._
class PartitionBatchPruningSuite extends SparkFunSuite with SharedSQLContext {
import testImplicits._
- private lazy val originalColumnBatchSize = ctx.conf.columnBatchSize
- private lazy val originalInMemoryPartitionPruning = ctx.conf.inMemoryPartitionPruning
+ private lazy val originalColumnBatchSize = sqlContext.conf.columnBatchSize
+ private lazy val originalInMemoryPartitionPruning = sqlContext.conf.inMemoryPartitionPruning
override protected def beforeAll(): Unit = {
super.beforeAll()
// Make a table with 5 partitions, 2 batches per partition, 10 elements per batch
- ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, 10)
+ sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, 10)
- val pruningData = ctx.sparkContext.makeRDD((1 to 100).map { key =>
+ val pruningData = sparkContext.makeRDD((1 to 100).map { key =>
val string = if (((key - 1) / 10) % 2 == 0) null else key.toString
TestData(key, string)
}, 5).toDF()
pruningData.registerTempTable("pruningData")
// Enable in-memory partition pruning
- ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
+ sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
// Enable in-memory table scan accumulators
- ctx.setConf("spark.sql.inMemoryTableScanStatistics.enable", "true")
- ctx.cacheTable("pruningData")
+ sqlContext.setConf("spark.sql.inMemoryTableScanStatistics.enable", "true")
+ sqlContext.cacheTable("pruningData")
}
override protected def afterAll(): Unit = {
try {
- ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
- ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
- ctx.uncacheTable("pruningData")
+ sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
+ sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
+ sqlContext.uncacheTable("pruningData")
} finally {
super.afterAll()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c3c0e431/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
index 8998f51..911d12e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
@@ -22,6 +22,8 @@ import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.test.SharedSQLContext
class ExchangeSuite extends SparkPlanTest with SharedSQLContext {
+ import testImplicits.localSeqToDataFrameHolder
+
test("shuffling UnsafeRows in exchange") {
val input = (1 to 1000).map(Tuple1.apply)
checkAnswer(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org