You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2015/04/07 09:20:07 UTC

phoenix git commit: PHOENIX-1818 - Move cluster-required tests to src/it

Repository: phoenix
Updated Branches:
  refs/heads/master 9ddb484aa -> f666baa27


PHOENIX-1818 - Move cluster-required tests to src/it


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f666baa2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f666baa2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f666baa2

Branch: refs/heads/master
Commit: f666baa27ed97cb08ba964c53df74907a53ce001
Parents: 9ddb484
Author: ravimagham <ra...@apache.org>
Authored: Tue Apr 7 00:19:21 2015 -0700
Committer: ravimagham <ra...@apache.org>
Committed: Tue Apr 7 00:19:21 2015 -0700

----------------------------------------------------------------------
 phoenix-spark/src/it/resources/log4j.xml        |  41 +++
 phoenix-spark/src/it/resources/setup.sql        |  18 +
 .../apache/phoenix/spark/PhoenixRDDTest.scala   | 333 +++++++++++++++++++
 phoenix-spark/src/test/resources/log4j.xml      |  41 ---
 phoenix-spark/src/test/resources/setup.sql      |  18 -
 .../apache/phoenix/spark/PhoenixRDDTest.scala   | 333 -------------------
 6 files changed, 392 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f666baa2/phoenix-spark/src/it/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/log4j.xml b/phoenix-spark/src/it/resources/log4j.xml
new file mode 100644
index 0000000..d4799da
--- /dev/null
+++ b/phoenix-spark/src/it/resources/log4j.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+  <appender name="console" class="org.apache.log4j.ConsoleAppender">
+    <param name="Target" value="System.out"/>
+
+    <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%-4r [%t] %-5p %c %x - %m%n"/>
+    </layout>
+  </appender>
+
+  <logger name="org.eclipse">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name="org.apache">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name = "org.apache.phoenix.mapreduce">
+    <level value="FATAL"/>
+  </logger>
+
+  <logger name="org.mortbay">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name="BlockStateChange">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name="io.netty">
+    <level value="ERROR"/>
+  </logger>
+
+  <root>
+    <priority value="INFO"/>
+    <appender-ref ref="console"/>
+  </root>
+</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f666baa2/phoenix-spark/src/it/resources/setup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/setup.sql b/phoenix-spark/src/it/resources/setup.sql
new file mode 100644
index 0000000..14a7e7e
--- /dev/null
+++ b/phoenix-spark/src/it/resources/setup.sql
@@ -0,0 +1,18 @@
+CREATE TABLE table1 (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
+CREATE TABLE table2 (id BIGINT NOT NULL PRIMARY KEY, table1_id BIGINT, "t2col1" VARCHAR)
+UPSERT INTO table1 (id, col1) VALUES (1, 'test_row_1')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (1, 1, 'test_child_1')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (2, 1, 'test_child_2')
+UPSERT INTO table1 (id, col1) VALUES (2, 'test_row_2')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (3, 2, 'test_child_1')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (4, 2, 'test_child_2')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (5, 2, 'test_child_3')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (6, 2, 'test_child_4')
+CREATE TABLE "table3" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR)
+UPSERT INTO "table3" ("id", "col1") VALUES (1, 'foo')
+UPSERT INTO "table3" ("id", "col1") VALUES (2, 'bar')
+CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])
+UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'])
+CREATE TABLE DATE_PREDICATE_TEST_TABLE (ID BIGINT NOT NULL, TIMESERIES_KEY TIMESTAMP NOT NULL CONSTRAINT pk PRIMARY KEY (ID, TIMESERIES_KEY))
+UPSERT INTO DATE_PREDICATE_TEST_TABLE (ID, TIMESERIES_KEY) VALUES (1, CAST(CURRENT_TIME() AS TIMESTAMP))
+CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER, col3 DATE)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f666baa2/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala
new file mode 100644
index 0000000..63cb6e4
--- /dev/null
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala
@@ -0,0 +1,333 @@
+/*
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import java.sql.{Connection, DriverManager}
+import java.util.Date
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.{HConstants, HBaseTestingUtility}
+import org.apache.phoenix.schema.ColumnNotFoundException
+import org.apache.phoenix.schema.types.PVarchar
+import org.apache.phoenix.util.ColumnInfo
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.types.{StringType, StructField}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.joda.time.DateTime
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+import org.apache.phoenix.spark._
+
+import scala.collection.mutable.ListBuffer
+
+class PhoenixRDDTest extends FunSuite with Matchers with BeforeAndAfterAll {
+  lazy val hbaseTestingUtility = {
+    new HBaseTestingUtility()
+  }
+
+  lazy val hbaseConfiguration = {
+    val conf = hbaseTestingUtility.getConfiguration
+
+    val quorum = conf.get("hbase.zookeeper.quorum")
+    val clientPort = conf.get("hbase.zookeeper.property.clientPort")
+    val znodeParent = conf.get("zookeeper.znode.parent")
+
+    // This is an odd one - the Zookeeper Quorum entry in the config is totally wrong. It's
+    // just reporting localhost.
+    conf.set(org.apache.hadoop.hbase.HConstants.ZOOKEEPER_QUORUM, s"$quorum:$clientPort:$znodeParent")
+
+    conf
+  }
+
+  lazy val quorumAddress = {
+    hbaseConfiguration.get("hbase.zookeeper.quorum")
+  }
+
+  lazy val zookeeperClientPort = {
+    hbaseConfiguration.get("hbase.zookeeper.property.clientPort")
+  }
+
+  lazy val zookeeperZnodeParent = {
+    hbaseConfiguration.get("zookeeper.znode.parent")
+  }
+
+  lazy val hbaseConnectionString = {
+    s"$quorumAddress:$zookeeperClientPort:$zookeeperZnodeParent"
+  }
+
+  var conn: Connection = _
+
+  override def beforeAll() {
+    hbaseTestingUtility.startMiniCluster()
+
+    conn = DriverManager.getConnection(s"jdbc:phoenix:$hbaseConnectionString")
+
+    conn.setAutoCommit(true)
+
+    // each SQL statement used to set up Phoenix must be on a single line. Yes, that
+    // can potentially make large lines.
+    val setupSqlSource = getClass.getClassLoader.getResourceAsStream("setup.sql")
+
+    val setupSql = scala.io.Source.fromInputStream(setupSqlSource).getLines()
+
+    for (sql <- setupSql) {
+      val stmt = conn.createStatement()
+
+      stmt.execute(sql)
+
+      stmt.close()
+    }
+
+    conn.commit()
+  }
+
+  override def afterAll() {
+    conn.close()
+    hbaseTestingUtility.shutdownMiniCluster()
+  }
+
+  val conf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
+
+  val sc = new SparkContext("local[1]", "PhoenixSparkTest", conf)
+
+  def buildSql(table: String, columns: Seq[String], predicate: Option[String]): String = {
+    val query = "SELECT %s FROM \"%s\"" format(columns.map(f => "\"" + f + "\"").mkString(", "), table)
+
+    query + (predicate match {
+      case Some(p: String) => " WHERE " + p
+      case _ => ""
+    })
+  }
+
+  test("Can create valid SQL") {
+    val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
+      conf = hbaseConfiguration)
+
+    rdd.buildSql("MyTable", Array("Foo", "Bar"), None) should
+      equal("SELECT \"Foo\", \"Bar\" FROM \"MyTable\"")
+  }
+
+  test("Can convert Phoenix schema") {
+    val phoenixSchema = List(
+      new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)
+    )
+
+    val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
+      conf = hbaseConfiguration)
+
+    val catalystSchema = rdd.phoenixSchemaToCatalystSchema(phoenixSchema)
+
+    val expected = List(StructField("varcharColumn", StringType, nullable = true))
+
+    catalystSchema shouldEqual expected
+  }
+
+  test("Can create schema RDD and execute query") {
+    val sqlContext = new SQLContext(sc)
+
+    val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
+
+    df1.registerTempTable("sql_table_1")
+
+    val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
+      conf = hbaseConfiguration)
+
+    df2.registerTempTable("sql_table_2")
+
+    val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)")
+
+    val count = sqlRdd.count()
+
+    count shouldEqual 6L
+  }
+
+  test("Can create schema RDD and execute query on case sensitive table (no config)") {
+    val sqlContext = new SQLContext(sc)
+
+    val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"), zkUrl = Some(hbaseConnectionString))
+
+    df1.registerTempTable("table3")
+
+    val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+
+    val count = sqlRdd.count()
+
+    count shouldEqual 2L
+  }
+
+  test("Can create schema RDD and execute constrained query") {
+    val sqlContext = new SQLContext(sc)
+
+    val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
+
+    df1.registerTempTable("sql_table_1")
+
+    val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
+      predicate = Some("\"ID\" = 1"),
+      conf = hbaseConfiguration)
+
+    df2.registerTempTable("sql_table_2")
+
+    val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)")
+
+    val count = sqlRdd.count()
+
+    count shouldEqual 1L
+  }
+
+  test("Using a predicate referring to a non-existent column should fail") {
+    intercept[RuntimeException] {
+      val sqlContext = new SQLContext(sc)
+
+      val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
+        predicate = Some("foo = bar"),
+        conf = hbaseConfiguration)
+
+      df1.registerTempTable("table3")
+
+      val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+
+      // we have to execute an action before the predicate failure can occur
+      val count = sqlRdd.count()
+    }.getCause shouldBe a [ColumnNotFoundException]
+  }
+
+  test("Can create schema RDD with predicate that will never match") {
+    val sqlContext = new SQLContext(sc)
+
+    val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
+      predicate = Some("\"id\" = -1"),
+      conf = hbaseConfiguration)
+
+    df1.registerTempTable("table3")
+
+    val sqlRdd = sqlContext.sql("SELECT * FROM table3")
+
+    val count = sqlRdd.count()
+
+    count shouldEqual 0L
+  }
+
+  test("Can create schema RDD with complex predicate") {
+    val sqlContext = new SQLContext(sc)
+
+    val df1 = sqlContext.phoenixTableAsDataFrame("DATE_PREDICATE_TEST_TABLE", Array("ID", "TIMESERIES_KEY"),
+      predicate = Some("ID > 0 AND TIMESERIES_KEY BETWEEN CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)"),
+      conf = hbaseConfiguration)
+    
+    df1.registerTempTable("date_predicate_test_table")
+
+    val sqlRdd = df1.sqlContext.sql("SELECT * FROM date_predicate_test_table")
+
+    val count = sqlRdd.count()
+
+    count shouldEqual 0L
+  }
+
+  test("Can query an array table") {
+    val sqlContext = new SQLContext(sc)
+
+    val df1 = sqlContext.phoenixTableAsDataFrame("ARRAY_TEST_TABLE", Array("ID", "VCARRAY"),
+      conf = hbaseConfiguration)
+
+    df1.registerTempTable("ARRAY_TEST_TABLE")
+
+    val sqlRdd = sqlContext.sql("SELECT * FROM ARRAY_TEST_TABLE")
+
+    val count = sqlRdd.count()
+
+    // get row 0, column 1, which should be "VCARRAY"
+    val arrayValues = sqlRdd.collect().apply(0).apply(1)
+
+    arrayValues should equal(Array("String1", "String2", "String3"))
+
+    count shouldEqual 1L
+  }
+  
+  test("Can read a table as an RDD") {
+    val rdd1 = sc.phoenixTableAsRDD("ARRAY_TEST_TABLE", Seq("ID", "VCARRAY"),
+      conf = hbaseConfiguration)
+
+    val count = rdd1.count()
+
+    val arrayValues = rdd1.take(1)(0)("VCARRAY")
+
+    arrayValues should equal(Array("String1", "String2", "String3"))
+
+    count shouldEqual 1L
+  }
+
+  test("Can save to phoenix table") {
+    val sqlContext = new SQLContext(sc)
+
+    val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
+
+    sc
+      .parallelize(dataSet)
+      .saveToPhoenix(
+        "OUTPUT_TEST_TABLE",
+        Seq("ID","COL1","COL2"),
+        hbaseConfiguration
+      )
+
+    // Load the results back
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE")
+    val results = ListBuffer[(Long, String, Int)]()
+    while(rs.next()) {
+      results.append((rs.getLong(1), rs.getString(2), rs.getInt(3)))
+    }
+    stmt.close()
+
+    // Verify they match
+    (0 to results.size - 1).foreach { i =>
+      dataSet(i) shouldEqual results(i)
+    }
+  }
+
+  test("Can save Java and Joda dates to Phoenix (no config)") {
+    val dt = new DateTime()
+    val date = new Date()
+
+    val dataSet = List((1L, "1", 1, dt), (2L, "2", 2, date))
+    sc
+      .parallelize(dataSet)
+      .saveToPhoenix(
+        "OUTPUT_TEST_TABLE",
+        Seq("ID","COL1","COL2","COL3"),
+        zkUrl = Some(hbaseConnectionString)
+      )
+
+    // Load the results back
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT COL3 FROM OUTPUT_TEST_TABLE WHERE ID = 1 OR ID = 2 ORDER BY ID ASC")
+    val results = ListBuffer[java.sql.Date]()
+    while(rs.next()) {
+      results.append(rs.getDate(1))
+    }
+    stmt.close()
+
+    // Verify the epochs are equal
+    results(0).getTime shouldEqual dt.getMillis
+    results(1).getTime shouldEqual date.getTime
+  }
+
+  test("Not specifying a zkUrl or a config quorum URL should fail") {
+    intercept[UnsupportedOperationException] {
+      val sqlContext = new SQLContext(sc)
+      val badConf = new Configuration(hbaseConfiguration)
+      badConf.unset(HConstants.ZOOKEEPER_QUORUM)
+      sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = badConf)
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f666baa2/phoenix-spark/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/test/resources/log4j.xml b/phoenix-spark/src/test/resources/log4j.xml
deleted file mode 100644
index d4799da..0000000
--- a/phoenix-spark/src/test/resources/log4j.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-  <appender name="console" class="org.apache.log4j.ConsoleAppender">
-    <param name="Target" value="System.out"/>
-
-    <layout class="org.apache.log4j.PatternLayout">
-      <param name="ConversionPattern" value="%-4r [%t] %-5p %c %x - %m%n"/>
-    </layout>
-  </appender>
-
-  <logger name="org.eclipse">
-    <level value="ERROR"/>
-  </logger>
-
-  <logger name="org.apache">
-    <level value="ERROR"/>
-  </logger>
-
-  <logger name = "org.apache.phoenix.mapreduce">
-    <level value="FATAL"/>
-  </logger>
-
-  <logger name="org.mortbay">
-    <level value="ERROR"/>
-  </logger>
-
-  <logger name="BlockStateChange">
-    <level value="ERROR"/>
-  </logger>
-
-  <logger name="io.netty">
-    <level value="ERROR"/>
-  </logger>
-
-  <root>
-    <priority value="INFO"/>
-    <appender-ref ref="console"/>
-  </root>
-</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f666baa2/phoenix-spark/src/test/resources/setup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/test/resources/setup.sql b/phoenix-spark/src/test/resources/setup.sql
deleted file mode 100644
index 14a7e7e..0000000
--- a/phoenix-spark/src/test/resources/setup.sql
+++ /dev/null
@@ -1,18 +0,0 @@
-CREATE TABLE table1 (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
-CREATE TABLE table2 (id BIGINT NOT NULL PRIMARY KEY, table1_id BIGINT, "t2col1" VARCHAR)
-UPSERT INTO table1 (id, col1) VALUES (1, 'test_row_1')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (1, 1, 'test_child_1')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (2, 1, 'test_child_2')
-UPSERT INTO table1 (id, col1) VALUES (2, 'test_row_2')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (3, 2, 'test_child_1')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (4, 2, 'test_child_2')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (5, 2, 'test_child_3')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (6, 2, 'test_child_4')
-CREATE TABLE "table3" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR)
-UPSERT INTO "table3" ("id", "col1") VALUES (1, 'foo')
-UPSERT INTO "table3" ("id", "col1") VALUES (2, 'bar')
-CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])
-UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'])
-CREATE TABLE DATE_PREDICATE_TEST_TABLE (ID BIGINT NOT NULL, TIMESERIES_KEY TIMESTAMP NOT NULL CONSTRAINT pk PRIMARY KEY (ID, TIMESERIES_KEY))
-UPSERT INTO DATE_PREDICATE_TEST_TABLE (ID, TIMESERIES_KEY) VALUES (1, CAST(CURRENT_TIME() AS TIMESTAMP))
-CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER, col3 DATE)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f666baa2/phoenix-spark/src/test/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/test/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala b/phoenix-spark/src/test/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala
deleted file mode 100644
index 63cb6e4..0000000
--- a/phoenix-spark/src/test/scala/org/apache/phoenix/spark/PhoenixRDDTest.scala
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
- */
-package org.apache.phoenix.spark
-
-import java.sql.{Connection, DriverManager}
-import java.util.Date
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.{HConstants, HBaseTestingUtility}
-import org.apache.phoenix.schema.ColumnNotFoundException
-import org.apache.phoenix.schema.types.PVarchar
-import org.apache.phoenix.util.ColumnInfo
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.types.{StringType, StructField}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.joda.time.DateTime
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-import org.apache.phoenix.spark._
-
-import scala.collection.mutable.ListBuffer
-
-class PhoenixRDDTest extends FunSuite with Matchers with BeforeAndAfterAll {
-  lazy val hbaseTestingUtility = {
-    new HBaseTestingUtility()
-  }
-
-  lazy val hbaseConfiguration = {
-    val conf = hbaseTestingUtility.getConfiguration
-
-    val quorum = conf.get("hbase.zookeeper.quorum")
-    val clientPort = conf.get("hbase.zookeeper.property.clientPort")
-    val znodeParent = conf.get("zookeeper.znode.parent")
-
-    // This is an odd one - the Zookeeper Quorum entry in the config is totally wrong. It's
-    // just reporting localhost.
-    conf.set(org.apache.hadoop.hbase.HConstants.ZOOKEEPER_QUORUM, s"$quorum:$clientPort:$znodeParent")
-
-    conf
-  }
-
-  lazy val quorumAddress = {
-    hbaseConfiguration.get("hbase.zookeeper.quorum")
-  }
-
-  lazy val zookeeperClientPort = {
-    hbaseConfiguration.get("hbase.zookeeper.property.clientPort")
-  }
-
-  lazy val zookeeperZnodeParent = {
-    hbaseConfiguration.get("zookeeper.znode.parent")
-  }
-
-  lazy val hbaseConnectionString = {
-    s"$quorumAddress:$zookeeperClientPort:$zookeeperZnodeParent"
-  }
-
-  var conn: Connection = _
-
-  override def beforeAll() {
-    hbaseTestingUtility.startMiniCluster()
-
-    conn = DriverManager.getConnection(s"jdbc:phoenix:$hbaseConnectionString")
-
-    conn.setAutoCommit(true)
-
-    // each SQL statement used to set up Phoenix must be on a single line. Yes, that
-    // can potentially make large lines.
-    val setupSqlSource = getClass.getClassLoader.getResourceAsStream("setup.sql")
-
-    val setupSql = scala.io.Source.fromInputStream(setupSqlSource).getLines()
-
-    for (sql <- setupSql) {
-      val stmt = conn.createStatement()
-
-      stmt.execute(sql)
-
-      stmt.close()
-    }
-
-    conn.commit()
-  }
-
-  override def afterAll() {
-    conn.close()
-    hbaseTestingUtility.shutdownMiniCluster()
-  }
-
-  val conf = new SparkConf().set("spark.ui.showConsoleProgress", "false")
-
-  val sc = new SparkContext("local[1]", "PhoenixSparkTest", conf)
-
-  def buildSql(table: String, columns: Seq[String], predicate: Option[String]): String = {
-    val query = "SELECT %s FROM \"%s\"" format(columns.map(f => "\"" + f + "\"").mkString(", "), table)
-
-    query + (predicate match {
-      case Some(p: String) => " WHERE " + p
-      case _ => ""
-    })
-  }
-
-  test("Can create valid SQL") {
-    val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
-      conf = hbaseConfiguration)
-
-    rdd.buildSql("MyTable", Array("Foo", "Bar"), None) should
-      equal("SELECT \"Foo\", \"Bar\" FROM \"MyTable\"")
-  }
-
-  test("Can convert Phoenix schema") {
-    val phoenixSchema = List(
-      new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)
-    )
-
-    val rdd = new PhoenixRDD(sc, "MyTable", Array("Foo", "Bar"),
-      conf = hbaseConfiguration)
-
-    val catalystSchema = rdd.phoenixSchemaToCatalystSchema(phoenixSchema)
-
-    val expected = List(StructField("varcharColumn", StringType, nullable = true))
-
-    catalystSchema shouldEqual expected
-  }
-
-  test("Can create schema RDD and execute query") {
-    val sqlContext = new SQLContext(sc)
-
-    val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
-
-    df1.registerTempTable("sql_table_1")
-
-    val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
-      conf = hbaseConfiguration)
-
-    df2.registerTempTable("sql_table_2")
-
-    val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)")
-
-    val count = sqlRdd.count()
-
-    count shouldEqual 6L
-  }
-
-  test("Can create schema RDD and execute query on case sensitive table (no config)") {
-    val sqlContext = new SQLContext(sc)
-
-    val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"), zkUrl = Some(hbaseConnectionString))
-
-    df1.registerTempTable("table3")
-
-    val sqlRdd = sqlContext.sql("SELECT * FROM table3")
-
-    val count = sqlRdd.count()
-
-    count shouldEqual 2L
-  }
-
-  test("Can create schema RDD and execute constrained query") {
-    val sqlContext = new SQLContext(sc)
-
-    val df1 = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = hbaseConfiguration)
-
-    df1.registerTempTable("sql_table_1")
-
-    val df2 = sqlContext.phoenixTableAsDataFrame("TABLE2", Array("ID", "TABLE1_ID"),
-      predicate = Some("\"ID\" = 1"),
-      conf = hbaseConfiguration)
-
-    df2.registerTempTable("sql_table_2")
-
-    val sqlRdd = sqlContext.sql("SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1 INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)")
-
-    val count = sqlRdd.count()
-
-    count shouldEqual 1L
-  }
-
-  test("Using a predicate referring to a non-existent column should fail") {
-    intercept[RuntimeException] {
-      val sqlContext = new SQLContext(sc)
-
-      val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
-        predicate = Some("foo = bar"),
-        conf = hbaseConfiguration)
-
-      df1.registerTempTable("table3")
-
-      val sqlRdd = sqlContext.sql("SELECT * FROM table3")
-
-      // we have to execute an action before the predicate failure can occur
-      val count = sqlRdd.count()
-    }.getCause shouldBe a [ColumnNotFoundException]
-  }
-
-  test("Can create schema RDD with predicate that will never match") {
-    val sqlContext = new SQLContext(sc)
-
-    val df1 = sqlContext.phoenixTableAsDataFrame("table3", Array("id", "col1"),
-      predicate = Some("\"id\" = -1"),
-      conf = hbaseConfiguration)
-
-    df1.registerTempTable("table3")
-
-    val sqlRdd = sqlContext.sql("SELECT * FROM table3")
-
-    val count = sqlRdd.count()
-
-    count shouldEqual 0L
-  }
-
-  test("Can create schema RDD with complex predicate") {
-    val sqlContext = new SQLContext(sc)
-
-    val df1 = sqlContext.phoenixTableAsDataFrame("DATE_PREDICATE_TEST_TABLE", Array("ID", "TIMESERIES_KEY"),
-      predicate = Some("ID > 0 AND TIMESERIES_KEY BETWEEN CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)"),
-      conf = hbaseConfiguration)
-    
-    df1.registerTempTable("date_predicate_test_table")
-
-    val sqlRdd = df1.sqlContext.sql("SELECT * FROM date_predicate_test_table")
-
-    val count = sqlRdd.count()
-
-    count shouldEqual 0L
-  }
-
-  test("Can query an array table") {
-    val sqlContext = new SQLContext(sc)
-
-    val df1 = sqlContext.phoenixTableAsDataFrame("ARRAY_TEST_TABLE", Array("ID", "VCARRAY"),
-      conf = hbaseConfiguration)
-
-    df1.registerTempTable("ARRAY_TEST_TABLE")
-
-    val sqlRdd = sqlContext.sql("SELECT * FROM ARRAY_TEST_TABLE")
-
-    val count = sqlRdd.count()
-
-    // get row 0, column 1, which should be "VCARRAY"
-    val arrayValues = sqlRdd.collect().apply(0).apply(1)
-
-    arrayValues should equal(Array("String1", "String2", "String3"))
-
-    count shouldEqual 1L
-  }
-  
-  test("Can read a table as an RDD") {
-    val rdd1 = sc.phoenixTableAsRDD("ARRAY_TEST_TABLE", Seq("ID", "VCARRAY"),
-      conf = hbaseConfiguration)
-
-    val count = rdd1.count()
-
-    val arrayValues = rdd1.take(1)(0)("VCARRAY")
-
-    arrayValues should equal(Array("String1", "String2", "String3"))
-
-    count shouldEqual 1L
-  }
-
-  test("Can save to phoenix table") {
-    val sqlContext = new SQLContext(sc)
-
-    val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
-
-    sc
-      .parallelize(dataSet)
-      .saveToPhoenix(
-        "OUTPUT_TEST_TABLE",
-        Seq("ID","COL1","COL2"),
-        hbaseConfiguration
-      )
-
-    // Load the results back
-    val stmt = conn.createStatement()
-    val rs = stmt.executeQuery("SELECT ID, COL1, COL2 FROM OUTPUT_TEST_TABLE")
-    val results = ListBuffer[(Long, String, Int)]()
-    while(rs.next()) {
-      results.append((rs.getLong(1), rs.getString(2), rs.getInt(3)))
-    }
-    stmt.close()
-
-    // Verify they match
-    (0 to results.size - 1).foreach { i =>
-      dataSet(i) shouldEqual results(i)
-    }
-  }
-
-  test("Can save Java and Joda dates to Phoenix (no config)") {
-    val dt = new DateTime()
-    val date = new Date()
-
-    val dataSet = List((1L, "1", 1, dt), (2L, "2", 2, date))
-    sc
-      .parallelize(dataSet)
-      .saveToPhoenix(
-        "OUTPUT_TEST_TABLE",
-        Seq("ID","COL1","COL2","COL3"),
-        zkUrl = Some(hbaseConnectionString)
-      )
-
-    // Load the results back
-    val stmt = conn.createStatement()
-    val rs = stmt.executeQuery("SELECT COL3 FROM OUTPUT_TEST_TABLE WHERE ID = 1 OR ID = 2 ORDER BY ID ASC")
-    val results = ListBuffer[java.sql.Date]()
-    while(rs.next()) {
-      results.append(rs.getDate(1))
-    }
-    stmt.close()
-
-    // Verify the epochs are equal
-    results(0).getTime shouldEqual dt.getMillis
-    results(1).getTime shouldEqual date.getTime
-  }
-
-  test("Not specifying a zkUrl or a config quorum URL should fail") {
-    intercept[UnsupportedOperationException] {
-      val sqlContext = new SQLContext(sc)
-      val badConf = new Configuration(hbaseConfiguration)
-      badConf.unset(HConstants.ZOOKEEPER_QUORUM)
-      sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = badConf)
-    }
-  }
-}
\ No newline at end of file