You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2015/04/09 10:34:55 UTC
phoenix git commit: PHOENIX-1071 Get the phoenix-spark integration
tests running.
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 67ed4e195 -> b0350c5fd
PHOENIX-1071 Get the phoenix-spark integration tests running.
Uses the BaseHBaseManagedTimeIT framework now for creating the
test cluster and setup/teardown.
Tested with Java 7u75 i386 on Ubuntu, and 7u40 x64 on OS X.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b0350c5f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b0350c5f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b0350c5f
Branch: refs/heads/4.x-HBase-0.98
Commit: b0350c5fd81ae2850b3059668933f45cdf84117d
Parents: 67ed4e1
Author: Josh Mahonin <jm...@gmail.com>
Authored: Tue Apr 7 22:33:17 2015 -0400
Committer: ravimagham <ra...@apache.org>
Committed: Thu Apr 9 01:34:02 2015 -0700
----------------------------------------------------------------------
phoenix-spark/pom.xml | 20 +-
phoenix-spark/src/it/resources/log4j.xml | 8 +
.../apache/phoenix/spark/PhoenixRDDTest.scala | 333 -------------------
.../apache/phoenix/spark/PhoenixSparkIT.scala | 326 ++++++++++++++++++
4 files changed, 347 insertions(+), 340 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0350c5f/phoenix-spark/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-spark/pom.xml b/phoenix-spark/pom.xml
index 4b90eb5..fe99968 100644
--- a/phoenix-spark/pom.xml
+++ b/phoenix-spark/pom.xml
@@ -22,7 +22,12 @@
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.phoenix</groupId>
+ <artifactId>phoenix-core</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
<!-- Force import of Spark's servlet API for unit tests -->
<dependency>
<groupId>javax.servlet</groupId>
@@ -46,7 +51,7 @@
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
- <version>2.2.2</version>
+ <version>2.2.4</version>
<scope>test</scope>
</dependency>
@@ -447,6 +452,8 @@
</dependencies>
<build>
+ <testSourceDirectory>src/it/scala</testSourceDirectory>
+ <testResources><testResource><directory>src/it/resources</directory></testResource></testResources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -500,9 +507,7 @@
<goal>test</goal>
</goals>
<configuration>
- <parallel>true</parallel>
- <tagsToExclude>Integration-Test</tagsToExclude>
- <argLine>-Xmx2g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+ <skipTests>true</skipTests>
</configuration>
</execution>
<execution>
@@ -512,8 +517,9 @@
<goal>test</goal>
</goals>
<configuration>
- <parallel>false</parallel>
- <tagsToInclude>Integration-Test</tagsToInclude>
+ <parallel>true</parallel>
+ <tagsToExclude>Integration-Test</tagsToExclude>
+ <argLine>-Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
</configuration>
</execution>
</executions>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0350c5f/phoenix-spark/src/it/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/log4j.xml b/phoenix-spark/src/it/resources/log4j.xml
index d4799da..58abece 100644
--- a/phoenix-spark/src/it/resources/log4j.xml
+++ b/phoenix-spark/src/it/resources/log4j.xml
@@ -26,6 +26,14 @@
<level value="ERROR"/>
</logger>
+ <logger name="org.spark-project.jetty">
+ <level value="ERROR"/>
+ </logger>
+
+ <logger name="akka">
+ <level value="ERROR"/>
+ </logger>
+
<logger name="BlockStateChange">
<level value="ERROR"/>
</logger>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0350c5f/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala
deleted file mode 100644
index 876f8a4..0000000
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
-package org.apache.phoenix.spark
-
-import java.sql.{Connection, DriverManager}
-import java.util.Date
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.{HConstants, HBaseTestingUtility}
-import org.apache.phoenix.schema.ColumnNotFoundException
-import org.apache.phoenix.schema.types.PVarchar
-import org.apache.phoenix.util.ColumnInfo
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.types.{StringType, StructField}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.joda.time.DateTime
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-import org.apache.phoenix.spark._
-
-import scala.collection.mutable.ListBuffer
-
-class PhoenixRDDTest extends FunSuite with Matchers with BeforeAndAfterAll {
- lazy val hbaseTestingUtility = {
- new HBaseTestingUtility()
- }
-
- lazy val hbaseConfiguration = {
- val conf = hbaseTestingUtility.getConfiguration
-
- val quorum = conf.get("hbase.zookeeper.quorum")
- val clientPort = conf.get("hbase.zookeeper.property.clientPort")
- val znodeParent = conf.get("zookeeper.znode.parent")
-
- // This is an odd one - the Zookeeper Quorum entry in the config is totally wrong. It's
- // just reporting localhost.
- conf.set(org.apache.hadoop.hbase.HConstants.ZOOKEEPER_QUORUM, s"$quorum:$clientPort:$znodeParent")
-
- conf
- }
-
- lazy val quorumAddress = {
- hbaseConfiguration.get("hbase.zookeeper.quorum")
- }
-
- lazy val zookeeperClientPort = {
- hbaseConfiguration.get("hbase.zookeeper.property.clientPort")
- }
-
- lazy val zookeeperZnodeParent = {
- hbaseConfiguration.get("zookeeper.znode.parent")
- }
-
- lazy val hbaseConnectionString = {
- s"$quorumAddress:$zookeeperClientPort:$zookeeperZnodeParent"
- }
-
- var conn: Connection = _
-
- override def beforeAll() {
- hbaseTestingUtility.startMiniCluster()
-
- conn = DriverManager.getConnection(s"jdbc:phoenix:$hbaseConnectionString")
-
- conn.setAutoCommit(true)
-
- // each SQL statement used to set up Phoenix must be on a single line. Yes, that
- // can potentially make large lines.
- val setupSqlSource = getClass.getClassLoader.getResourceAsStream("setup.sql")
-
- val setupSql = scala.io.Source.fromInputStream(setupSqlSource).getLines()
-
- for (sql <- setupSql) {
- val stmt = conn.createStatement()
-
- stmt.execute(sql)
-
- stmt.close()
- }
-
- conn.commit()
- }
-
- override def afterAll() {
- conn.close()
- hbaseTestingUtility.shutdownMiniCluster()
- }
-
- val conf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
-
- val sc = new SparkContext("local[1]", "PhoenixSparkTest", conf)
-
- def buildSql(table: String, columns: Seq[String], predicate: Option[String]): String = {
- val query = "SELECT %s FROM \"%s\"" format(columns.map(f => "\"" + f + "\"").mkString(", "), table)
-
- query + (predicate match {
- case Some(p: String) => " WHERE " + p
- case _ => ""
- })
- }
-
- test("Can create valid SQL") {
- val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
- conf = hbaseConfiguration)
-
- rdd.buildSql("MyTable", Array("Foo", "Bar"), None) should
- equal("SELECT \"Foo\", \"Bar\" FROM \"MyTable\"")
- }
-
- test("Can convert Phoenix schema") {
- val phoenixSchema = List(
- new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)
- )
-
- val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
- conf = hbaseConfiguration)
-
- val catalystSchema = rdd.phoenixSchemaToCatalystSchema(phoenixSchema)
-
- val expected = List(StructField("varcharColumn", StringType, nullable = true))
-
- catalystSchema shouldEqual expected
- }
-
- test("Can create schema RDD and execute query") {
- val sqlContext = new SQLContext(sc)
-
- val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
-
- df1.registerTempTable("sql_table_1")
-
- val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
- conf = hbaseConfiguration)
-
- df2.registerTempTable("sql_table_2")
-
- val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)")
-
- val count = sqlRdd.count()
-
- count shouldEqual 6L
- }
-
- test("Can create schema RDD and execute query on case sensitive table (no config)") {
- val sqlContext = new SQLContext(sc)
-
- val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"), zkUrl = Some(hbaseConnectionString))
-
- df1.registerTempTable("table3")
-
- val sqlRdd = sqlContext.sql("SELECT * FROM table3")
-
- val count = sqlRdd.count()
-
- count shouldEqual 2L
- }
-
- test("Can create schema RDD and execute constrained query") {
- val sqlContext = new SQLContext(sc)
-
- val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
-
- df1.registerTempTable("sql_table_1")
-
- val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
- predicate = Some("\"ID\" = 1"),
- conf = hbaseConfiguration)
-
- df2.registerTempTable("sql_table_2")
-
- val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)")
-
- val count = sqlRdd.count()
-
- count shouldEqual 1L
- }
-
- test("Using a predicate referring to a non-existent column should fail") {
- intercept[RuntimeException] {
- val sqlContext = new SQLContext(sc)
-
- val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
- predicate = Some("foo = bar"),
- conf = hbaseConfiguration)
-
- df1.registerTempTable("table3")
-
- val sqlRdd = sqlContext.sql("SELECT * FROM table3")
-
- // we have to execute an action before the predicate failure can occur
- val count = sqlRdd.count()
- }.getCause shouldBe a [ColumnNotFoundException]
- }
-
- test("Can create schema RDD with predicate that will never match") {
- val sqlContext = new SQLContext(sc)
-
- val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
- predicate = Some("\"id\" = -1"),
- conf = hbaseConfiguration)
-
- df1.registerTempTable("table3")
-
- val sqlRdd = sqlContext.sql("SELECT * FROM table3")
-
- val count = sqlRdd.count()
-
- count shouldEqual 0L
- }
-
- test("Can create schema RDD with complex predicate") {
- val sqlContext = new SQLContext(sc)
-
- val df1 = sqlContext.phoenixTableAsDataFrame("DATE_PREDICATE_TEST_TABLE", Array("ID", "TIMESERIES_KEY"),
- predicate = Some("ID > 0 AND TIMESERIES_KEY BETWEEN CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)"),
- conf = hbaseConfiguration)
-
- df1.registerTempTable("date_predicate_test_table")
-
- val sqlRdd = df1.sqlContext.sql("SELECT * FROM date_predicate_test_table")
-
- val count = sqlRdd.count()
-
- count shouldEqual 0L
- }
-
- test("Can query an array table") {
- val sqlContext = new SQLContext(sc)
-
- val df1 = sqlContext.phoenixTableAsDataFrame("ARRAY_TEST_TABLE", Array("ID", "VCARRAY"),
- conf = hbaseConfiguration)
-
- df1.registerTempTable("ARRAY_TEST_TABLE")
-
- val sqlRdd = sqlContext.sql("SELECT * FROM ARRAY_TEST_TABLE")
-
- val count = sqlRdd.count()
-
- // get row 0, column 1, which should be "VCARRAY"
- val arrayValues = sqlRdd.collect().apply(0).apply(1)
-
- arrayValues should equal(Array("String1", "String2", "String3"))
-
- count shouldEqual 1L
- }
-
- test("Can read a table as an RDD") {
- val rdd1 = sc.phoenixTableAsRDD("ARRAY_TEST_TABLE", Seq("ID", "VCARRAY"),
- conf = hbaseConfiguration)
-
- val count = rdd1.count()
-
- val arrayValues = rdd1.take(1)(0)("VCARRAY")
-
- arrayValues should equal(Array("String1", "String2", "String3"))
-
- count shouldEqual 1L
- }
-
- test("Can save to phoenix table") {
- val sqlContext = new SQLContext(sc)
-
- val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
-
- sc
- .parallelize(dataSet)
- .saveToPhoenix(
- "OUTPUT_TEST_TABLE",
- Seq("ID","COL1","COL2"),
- hbaseConfiguration
- )
-
- // Load the results back
- val stmt = conn.createStatement()
- val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE")
- val results = ListBuffer[(Long, String, Int)]()
- while(rs.next()) {
- results.append((rs.getLong(1), rs.getString(2), rs.getInt(3)))
- }
- stmt.close()
-
- // Verify they match
- (0 to results.size - 1).foreach { i =>
- dataSet(i) shouldEqual results(i)
- }
- }
-
- test("Can save Java and Joda dates to Phoenix (no config)") {
- val dt = new DateTime()
- val date = new Date()
-
- val dataSet = List((1L, "1", 1, dt), (2L, "2", 2, date))
- sc
- .parallelize(dataSet)
- .saveToPhoenix(
- "OUTPUT_TEST_TABLE",
- Seq("ID","COL1","COL2","COL3"),
- zkUrl = Some(hbaseConnectionString)
- )
-
- // Load the results back
- val stmt = conn.createStatement()
- val rs = stmt.executeQuery("SELECT COL3 FROM OUTPUT_TEST_TABLE WHERE ID = 1 OR ID = 2 ORDER BY ID ASC")
- val results = ListBuffer[java.sql.Date]()
- while(rs.next()) {
- results.append(rs.getDate(1))
- }
- stmt.close()
-
- // Verify the epochs are equal
- results(0).getTime shouldEqual dt.getMillis
- results(1).getTime shouldEqual date.getTime
- }
-
- test("Not specifying a zkUrl or a config quorum URL should fail") {
- intercept[UnsupportedOperationException] {
- val sqlContext = new SQLContext(sc)
- val badConf = new Configuration(hbaseConfiguration)
- badConf.unset(HConstants.ZOOKEEPER_QUORUM)
- sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = badConf)
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0350c5f/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
new file mode 100644
index 0000000..149baec
--- /dev/null
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -0,0 +1,326 @@
+/*
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import java.sql.{Connection, DriverManager}
+import java.util.Date
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.{HConstants, HBaseTestingUtility}
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT
+import org.apache.phoenix.query.BaseTest
+import org.apache.phoenix.schema.ColumnNotFoundException
+import org.apache.phoenix.schema.types.PVarchar
+import org.apache.phoenix.util.ColumnInfo
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.types.{StringType, StructField}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.joda.time.DateTime
+import org.scalatest._
+import org.apache.phoenix.spark._
+
+import scala.collection.mutable.ListBuffer
+
+/*
+ Note: If running directly from an IDE, these are the recommended VM parameters:
+ -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ */
+
+// Helper object to access the protected abstract static methods hidden in BaseHBaseManagedTimeIT
+object PhoenixSparkITHelper extends BaseHBaseManagedTimeIT {
+ def getTestClusterConfig = BaseHBaseManagedTimeIT.getTestClusterConfig
+ def doSetup = BaseHBaseManagedTimeIT.doSetup()
+ def doTeardown = BaseHBaseManagedTimeIT.doTeardown()
+ def getUrl = BaseTest.getUrl
+}
+
+class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
+ var conn: Connection = _
+ var sc: SparkContext = _
+
+ lazy val hbaseConfiguration = {
+ val conf = PhoenixSparkITHelper.getTestClusterConfig
+ // The zookeeper quorum address defaults to "localhost" which is incorrect, let's fix it
+ val quorum = conf.get("hbase.zookeeper.quorum")
+ val clientPort = conf.get("hbase.zookeeper.property.clientPort")
+ val znodeParent = conf.get("zookeeper.znode.parent")
+ conf.set(HConstants.ZOOKEEPER_QUORUM, s"$quorum:$clientPort:$znodeParent")
+ conf
+ }
+
+ lazy val quorumAddress = {
+ hbaseConfiguration.get(HConstants.ZOOKEEPER_QUORUM)
+ }
+
+ override def beforeAll() {
+ PhoenixSparkITHelper.doSetup
+
+ conn = DriverManager.getConnection(PhoenixSparkITHelper.getUrl)
+ conn.setAutoCommit(true)
+
+ // each SQL statement used to set up Phoenix must be on a single line. Yes, that
+ // can potentially make large lines.
+ val setupSqlSource = getClass.getClassLoader.getResourceAsStream("setup.sql")
+
+ val setupSql = scala.io.Source.fromInputStream(setupSqlSource).getLines()
+
+ for (sql <- setupSql) {
+ val stmt = conn.createStatement()
+ stmt.execute(sql)
+ }
+ conn.commit()
+
+ val conf = new SparkConf()
+ .setAppName("PhoenixSparkIT")
+ .setMaster("local[2]") // 2 threads, some parallelism
+ .set("spark.ui.showConsoleProgress", "false") // Disable printing stage progress
+
+ sc = new SparkContext(conf)
+ }
+
+ override def afterAll() {
+ conn.close()
+ sc.stop()
+ PhoenixSparkITHelper.doTeardown
+ }
+
+ def buildSql(table: String, columns: Seq[String], predicate: Option[String]): String = {
+ val query = "SELECT %s FROM \"%s\"" format(columns.map(f => "\"" + f + "\"").mkString(", "), table)
+
+ query + (predicate match {
+ case Some(p: String) => " WHERE " + p
+ case _ => ""
+ })
+ }
+
+ test("Can create valid SQL") {
+ val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
+ conf = hbaseConfiguration)
+
+ rdd.buildSql("MyTable", Array("Foo", "Bar"), None) should
+ equal("SELECT \"Foo\", \"Bar\" FROM \"MyTable\"")
+ }
+
+ test("Can convert Phoenix schema") {
+ val phoenixSchema = List(
+ new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)
+ )
+
+ val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
+ conf = hbaseConfiguration)
+
+ val catalystSchema = rdd.phoenixSchemaToCatalystSchema(phoenixSchema)
+
+ val expected = List(StructField("varcharColumn", StringType, nullable = true))
+
+ catalystSchema shouldEqual expected
+ }
+
+ test("Can create schema RDD and execute query") {
+ val sqlContext = new SQLContext(sc)
+
+ val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
+
+ df1.registerTempTable("sql_table_1")
+
+ val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
+ conf = hbaseConfiguration)
+
+ df2.registerTempTable("sql_table_2")
+
+ val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)")
+
+ val count = sqlRdd.count()
+
+ count shouldEqual 6L
+ }
+
+ test("Can create schema RDD and execute query on case sensitive table (no config)") {
+ val sqlContext = new SQLContext(sc)
+
+ val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"), zkUrl = Some(quorumAddress))
+
+ df1.registerTempTable("table3")
+
+ val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+
+ val count = sqlRdd.count()
+
+ count shouldEqual 2L
+ }
+
+ test("Can create schema RDD and execute constrained query") {
+ val sqlContext = new SQLContext(sc)
+
+ val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
+
+ df1.registerTempTable("sql_table_1")
+
+ val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
+ predicate = Some("\"ID\" = 1"),
+ conf = hbaseConfiguration)
+
+ df2.registerTempTable("sql_table_2")
+
+ val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)")
+
+ val count = sqlRdd.count()
+
+ count shouldEqual 1L
+ }
+
+ test("Using a predicate referring to a non-existent column should fail") {
+ intercept[RuntimeException] {
+ val sqlContext = new SQLContext(sc)
+
+ val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
+ predicate = Some("foo = bar"),
+ conf = hbaseConfiguration)
+
+ df1.registerTempTable("table3")
+
+ val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+
+ // we have to execute an action before the predicate failure can occur
+ val count = sqlRdd.count()
+ }.getCause shouldBe a [ColumnNotFoundException]
+ }
+
+ test("Can create schema RDD with predicate that will never match") {
+ val sqlContext = new SQLContext(sc)
+
+ val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
+ predicate = Some("\"id\" = -1"),
+ conf = hbaseConfiguration)
+
+ df1.registerTempTable("table3")
+
+ val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+
+ val count = sqlRdd.count()
+
+ count shouldEqual 0L
+ }
+
+ test("Can create schema RDD with complex predicate") {
+ val sqlContext = new SQLContext(sc)
+
+ val df1 = sqlContext.phoenixTableAsDataFrame("DATE_PREDICATE_TEST_TABLE", Array("ID", "TIMESERIES_KEY"),
+ predicate = Some("ID > 0 AND TIMESERIES_KEY BETWEEN CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)"),
+ conf = hbaseConfiguration)
+
+ df1.registerTempTable("date_predicate_test_table")
+
+ val sqlRdd = df1.sqlContext.sql("SELECT * FROM date_predicate_test_table")
+
+ val count = sqlRdd.count()
+
+ count shouldEqual 0L
+ }
+
+ test("Can query an array table") {
+ val sqlContext = new SQLContext(sc)
+
+ val df1 = sqlContext.phoenixTableAsDataFrame("ARRAY_TEST_TABLE", Array("ID", "VCARRAY"),
+ conf = hbaseConfiguration)
+
+ df1.registerTempTable("ARRAY_TEST_TABLE")
+
+ val sqlRdd = sqlContext.sql("SELECT * FROM ARRAY_TEST_TABLE")
+
+ val count = sqlRdd.count()
+
+ // get row 0, column 1, which should be "VCARRAY"
+ val arrayValues = sqlRdd.collect().apply(0).apply(1)
+
+ arrayValues should equal(Array("String1", "String2", "String3"))
+
+ count shouldEqual 1L
+ }
+
+ test("Can read a table as an RDD") {
+ val rdd1 = sc.phoenixTableAsRDD("ARRAY_TEST_TABLE", Seq("ID", "VCARRAY"),
+ conf = hbaseConfiguration)
+
+ val count = rdd1.count()
+
+ val arrayValues = rdd1.take(1)(0)("VCARRAY")
+
+ arrayValues should equal(Array("String1", "String2", "String3"))
+
+ count shouldEqual 1L
+ }
+
+ test("Can save to phoenix table") {
+ val sqlContext = new SQLContext(sc)
+
+ val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
+
+ sc
+ .parallelize(dataSet)
+ .saveToPhoenix(
+ "OUTPUT_TEST_TABLE",
+ Seq("ID","COL1","COL2"),
+ hbaseConfiguration
+ )
+
+ // Load the results back
+ val stmt = conn.createStatement()
+ val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE")
+ val results = ListBuffer[(Long, String, Int)]()
+ while(rs.next()) {
+ results.append((rs.getLong(1), rs.getString(2), rs.getInt(3)))
+ }
+
+ // Verify they match
+ (0 to results.size - 1).foreach { i =>
+ dataSet(i) shouldEqual results(i)
+ }
+ }
+
+ test("Can save Java and Joda dates to Phoenix (no config)") {
+ val dt = new DateTime()
+ val date = new Date()
+
+ val dataSet = List((1L, "1", 1, dt), (2L, "2", 2, date))
+ sc
+ .parallelize(dataSet)
+ .saveToPhoenix(
+ "OUTPUT_TEST_TABLE",
+ Seq("ID","COL1","COL2","COL3"),
+ zkUrl = Some(quorumAddress)
+ )
+
+ // Load the results back
+ val stmt = conn.createStatement()
+ val rs = stmt.executeQuery("SELECT COL3 FROM OUTPUT_TEST_TABLE WHERE ID = 1 OR ID = 2 ORDER BY ID ASC")
+ val results = ListBuffer[java.sql.Date]()
+ while(rs.next()) {
+ results.append(rs.getDate(1))
+ }
+
+ // Verify the epochs are equal
+ results(0).getTime shouldEqual dt.getMillis
+ results(1).getTime shouldEqual date.getTime
+ }
+
+ test("Not specifying a zkUrl or a config quorum URL should fail") {
+ intercept[UnsupportedOperationException] {
+ val sqlContext = new SQLContext(sc)
+ val badConf = new Configuration(hbaseConfiguration)
+ badConf.unset(HConstants.ZOOKEEPER_QUORUM)
+ sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = badConf)
+ }
+ }
+}