You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by jm...@apache.org on 2016/11/10 19:04:29 UTC
phoenix git commit: PHOENIX-3427 phoenix-spark: Table undefined for
tenant specific view (Nico Pappagianis)
Repository: phoenix
Updated Branches:
refs/heads/master 32aff3910 -> 3886638b0
PHOENIX-3427 phoenix-spark: Table undefined for tenant specific view (Nico Pappagianis)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3886638b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3886638b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3886638b
Branch: refs/heads/master
Commit: 3886638b04a3e494be70e1e681ad56cb1c7e33c5
Parents: 32aff39
Author: Josh Mahonin <jm...@gmail.com>
Authored: Thu Nov 10 14:02:59 2016 -0500
Committer: Josh Mahonin <jm...@gmail.com>
Committed: Thu Nov 10 14:02:59 2016 -0500
----------------------------------------------------------------------
phoenix-spark/src/it/resources/globalSetup.sql | 58 ++++++++++
phoenix-spark/src/it/resources/setup.sql | 56 ----------
phoenix-spark/src/it/resources/tenantSetup.sql | 17 +++
.../phoenix/spark/AbstractPhoenixSparkIT.scala | 110 ++++++++++++++++++
.../apache/phoenix/spark/PhoenixSparkIT.scala | 105 ++++--------------
.../spark/PhoenixSparkITTenantSpecific.scala | 111 +++++++++++++++++++
.../phoenix/spark/ConfigurationUtil.scala | 14 ++-
.../phoenix/spark/DataFrameFunctions.scala | 9 +-
.../apache/phoenix/spark/DefaultSource.scala | 8 +-
.../phoenix/spark/ProductRDDFunctions.scala | 9 +-
10 files changed, 344 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/it/resources/globalSetup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/globalSetup.sql b/phoenix-spark/src/it/resources/globalSetup.sql
new file mode 100644
index 0000000..852687e
--- /dev/null
+++ b/phoenix-spark/src/it/resources/globalSetup.sql
@@ -0,0 +1,58 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE table1 (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
+CREATE TABLE table1_copy (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
+CREATE TABLE table2 (id BIGINT NOT NULL PRIMARY KEY, table1_id BIGINT, "t2col1" VARCHAR)
+UPSERT INTO table1 (id, col1) VALUES (1, 'test_row_1')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (1, 1, 'test_child_1')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (2, 1, 'test_child_2')
+UPSERT INTO table1 (id, col1) VALUES (2, 'test_row_2')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (3, 2, 'test_child_1')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (4, 2, 'test_child_2')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (5, 2, 'test_child_3')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (6, 2, 'test_child_4')
+CREATE TABLE "table3" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR)
+UPSERT INTO "table3" ("id", "col1") VALUES (1, 'foo')
+UPSERT INTO "table3" ("id", "col1") VALUES (2, 'bar')
+CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])
+UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'])
+CREATE TABLE ARRAYBUFFER_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[], INTARRAY INTEGER[])
+UPSERT INTO ARRAYBUFFER_TEST_TABLE (ID, VCARRAY, INTARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'], ARRAY[1, 2, 3])
+CREATE TABLE ARRAY_ANYVAL_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, INTARRAY INTEGER[], BIGINTARRAY BIGINT[])
+UPSERT INTO ARRAY_ANYVAL_TEST_TABLE (ID, INTARRAY, BIGINTARRAY) VALUES (1, ARRAY[1, 2, 3], ARRAY[1, 2, 3])
+CREATE TABLE ARRAY_BYTE_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, BYTEARRAY TINYINT[])
+UPSERT INTO ARRAY_BYTE_TEST_TABLE (ID, BYTEARRAY) VALUES (1, ARRAY[1, 2, 3])
+CREATE TABLE VARBINARY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, BIN BINARY(1), VARBIN VARBINARY, BINARRAY BINARY(1)[])
+CREATE TABLE DATE_PREDICATE_TEST_TABLE (ID BIGINT NOT NULL, TIMESERIES_KEY TIMESTAMP NOT NULL CONSTRAINT pk PRIMARY KEY (ID, TIMESERIES_KEY))
+UPSERT INTO DATE_PREDICATE_TEST_TABLE (ID, TIMESERIES_KEY) VALUES (1, CAST(CURRENT_TIME() AS TIMESTAMP))
+CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER, col3 DATE)
+CREATE TABLE CUSTOM_ENTITY."z02"(id BIGINT NOT NULL PRIMARY KEY)
+UPSERT INTO CUSTOM_ENTITY."z02" (id) VALUES(1)
+CREATE TABLE TEST_DECIMAL (ID BIGINT NOT NULL PRIMARY KEY, COL1 DECIMAL(9, 6))
+UPSERT INTO TEST_DECIMAL VALUES (1, 123.456789)
+CREATE TABLE TEST_SMALL_TINY (ID BIGINT NOT NULL PRIMARY KEY, COL1 SMALLINT, COL2 TINYINT)
+UPSERT INTO TEST_SMALL_TINY VALUES (1, 32767, 127)
+CREATE TABLE DATE_TEST(ID BIGINT NOT NULL PRIMARY KEY, COL1 DATE)
+UPSERT INTO DATE_TEST VALUES(1, CURRENT_DATE())
+CREATE TABLE "space" ("key" VARCHAR PRIMARY KEY, "first name" VARCHAR)
+UPSERT INTO "space" VALUES ('key1', 'xyz')
+CREATE TABLE "small" ("key" VARCHAR PRIMARY KEY, "first name" VARCHAR, "salary" INTEGER )
+UPSERT INTO "small" VALUES ('key1', 'foo', 10000)
+UPSERT INTO "small" VALUES ('key2', 'bar', 20000)
+UPSERT INTO "small" VALUES ('key3', 'xyz', 30000)
+
+CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT NULL, ORGANIZATION_ID VARCHAR, GLOBAL_COL1 VARCHAR CONSTRAINT pk PRIMARY KEY (TENANT_ID, ORGANIZATION_ID)) MULTI_TENANT=true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/it/resources/setup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/setup.sql b/phoenix-spark/src/it/resources/setup.sql
deleted file mode 100644
index e56924f..0000000
--- a/phoenix-spark/src/it/resources/setup.sql
+++ /dev/null
@@ -1,56 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements. See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership. The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License. You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
-CREATE TABLE table1 (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
-CREATE TABLE table1_copy (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
-CREATE TABLE table2 (id BIGINT NOT NULL PRIMARY KEY, table1_id BIGINT, "t2col1" VARCHAR)
-UPSERT INTO table1 (id, col1) VALUES (1, 'test_row_1')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (1, 1, 'test_child_1')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (2, 1, 'test_child_2')
-UPSERT INTO table1 (id, col1) VALUES (2, 'test_row_2')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (3, 2, 'test_child_1')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (4, 2, 'test_child_2')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (5, 2, 'test_child_3')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (6, 2, 'test_child_4')
-CREATE TABLE "table3" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR)
-UPSERT INTO "table3" ("id", "col1") VALUES (1, 'foo')
-UPSERT INTO "table3" ("id", "col1") VALUES (2, 'bar')
-CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])
-UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'])
-CREATE TABLE ARRAYBUFFER_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[], INTARRAY INTEGER[])
-UPSERT INTO ARRAYBUFFER_TEST_TABLE (ID, VCARRAY, INTARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'], ARRAY[1, 2, 3])
-CREATE TABLE ARRAY_ANYVAL_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, INTARRAY INTEGER[], BIGINTARRAY BIGINT[])
-UPSERT INTO ARRAY_ANYVAL_TEST_TABLE (ID, INTARRAY, BIGINTARRAY) VALUES (1, ARRAY[1, 2, 3], ARRAY[1, 2, 3])
-CREATE TABLE ARRAY_BYTE_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, BYTEARRAY TINYINT[])
-UPSERT INTO ARRAY_BYTE_TEST_TABLE (ID, BYTEARRAY) VALUES (1, ARRAY[1, 2, 3])
-CREATE TABLE VARBINARY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, BIN BINARY(1), VARBIN VARBINARY, BINARRAY BINARY(1)[])
-CREATE TABLE DATE_PREDICATE_TEST_TABLE (ID BIGINT NOT NULL, TIMESERIES_KEY TIMESTAMP NOT NULL CONSTRAINT pk PRIMARY KEY (ID, TIMESERIES_KEY))
-UPSERT INTO DATE_PREDICATE_TEST_TABLE (ID, TIMESERIES_KEY) VALUES (1, CAST(CURRENT_TIME() AS TIMESTAMP))
-CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER, col3 DATE)
-CREATE TABLE CUSTOM_ENTITY."z02"(id BIGINT NOT NULL PRIMARY KEY)
-UPSERT INTO CUSTOM_ENTITY."z02" (id) VALUES(1)
-CREATE TABLE TEST_DECIMAL (ID BIGINT NOT NULL PRIMARY KEY, COL1 DECIMAL(9, 6))
-UPSERT INTO TEST_DECIMAL VALUES (1, 123.456789)
-CREATE TABLE TEST_SMALL_TINY (ID BIGINT NOT NULL PRIMARY KEY, COL1 SMALLINT, COL2 TINYINT)
-UPSERT INTO TEST_SMALL_TINY VALUES (1, 32767, 127)
-CREATE TABLE DATE_TEST(ID BIGINT NOT NULL PRIMARY KEY, COL1 DATE)
-UPSERT INTO DATE_TEST VALUES(1, CURRENT_DATE())
-CREATE TABLE "space" ("key" VARCHAR PRIMARY KEY, "first name" VARCHAR)
-UPSERT INTO "space" VALUES ('key1', 'xyz')
-CREATE TABLE "small" ("key" VARCHAR PRIMARY KEY, "first name" VARCHAR, "salary" INTEGER )
-UPSERT INTO "small" VALUES ('key1', 'foo', 10000)
-UPSERT INTO "small" VALUES ('key2', 'bar', 20000)
-UPSERT INTO "small" VALUES ('key3', 'xyz', 30000)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/it/resources/tenantSetup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/tenantSetup.sql b/phoenix-spark/src/it/resources/tenantSetup.sql
new file mode 100644
index 0000000..4a866dc
--- /dev/null
+++ b/phoenix-spark/src/it/resources/tenantSetup.sql
@@ -0,0 +1,17 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE VIEW IF NOT EXISTS TENANT_VIEW(TENANT_ONLY_COL VARCHAR) AS SELECT * FROM MULTITENANT_TEST_TABLE
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
new file mode 100644
index 0000000..27dfe81
--- /dev/null
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
@@ -0,0 +1,110 @@
+/*
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import java.sql.{Connection, DriverManager}
+
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT
+import org.apache.phoenix.query.BaseTest
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+
+// Helper object to access the protected abstract static methods hidden in BaseHBaseManagedTimeIT
+object PhoenixSparkITHelper extends BaseHBaseManagedTimeIT {
+ def getTestClusterConfig = BaseHBaseManagedTimeIT.getTestClusterConfig
+
+ def doSetup = {
+ // The @ClassRule doesn't seem to be getting picked up, force creation here before setup
+ BaseTest.tmpFolder.create()
+ BaseHBaseManagedTimeIT.doSetup()
+ }
+
+ def doTeardown = BaseHBaseManagedTimeIT.doTeardown()
+
+ def getUrl = BaseTest.getUrl
+}
+
+/**
+ * Base class for PhoenixSparkIT
+ */
+class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
+
+ // A global tenantId we can use across tests
+ final val TenantId = "theTenant"
+
+ // TENANT_VIEW schema
+ val OrgId = "ORGANIZATION_ID"
+ val TenantCol = "TENANT_ONLY_COL"
+ val ViewName = "TENANT_VIEW"
+
+ var conn: Connection = _
+ var sc: SparkContext = _
+
+ lazy val hbaseConfiguration = {
+ val conf = PhoenixSparkITHelper.getTestClusterConfig
+ conf
+ }
+
+ lazy val quorumAddress = {
+ ConfigurationUtil.getZookeeperURL(hbaseConfiguration).get
+ }
+
+ // Runs SQL commands located in the file defined in the sqlSource argument
+ // Optional argument tenantId used for running tenant-specific SQL
+ def setupTables(sqlSource: String, tenantId: Option[String]): Unit = {
+ val url = tenantId match {
+ case Some(tenantId) => PhoenixSparkITHelper.getUrl + ";TenantId=" + tenantId
+ case _ => PhoenixSparkITHelper.getUrl
+ }
+
+ conn = DriverManager.getConnection(url)
+ conn.setAutoCommit(true)
+
+ val setupSqlSource = getClass.getClassLoader.getResourceAsStream(sqlSource)
+
+ // each SQL statement used to set up Phoenix must be on a single line. Yes, that
+ // can potentially make large lines.
+ val setupSql = scala.io.Source.fromInputStream(setupSqlSource).getLines()
+ .filter(line => !line.startsWith("--") && !line.isEmpty)
+
+ for (sql <- setupSql) {
+ val stmt = conn.createStatement()
+ stmt.execute(sql)
+ }
+ conn.commit()
+ }
+
+ override def beforeAll() {
+ PhoenixSparkITHelper.doSetup
+
+ // We pass in null for TenantId here since these tables will be globally visible
+ setupTables("globalSetup.sql", null)
+ // We pass in a TenantId to allow the DDL to create tenant-specific tables/views
+ setupTables("tenantSetup.sql", Some(TenantId))
+
+ val conf = new SparkConf()
+ .setAppName("PhoenixSparkIT")
+ .setMaster("local[2]") // 2 threads, some parallelism
+ .set("spark.ui.showConsoleProgress", "false") // Disable printing stage progress
+
+ sc = new SparkContext(conf)
+ }
+
+ override def afterAll() {
+ conn.close()
+ sc.stop()
+ PhoenixSparkITHelper.doTeardown
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 7d05f07..8aeba09 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -13,85 +13,21 @@
*/
package org.apache.phoenix.spark
-import java.sql.{Connection, DriverManager}
import java.util.Date
-import org.apache.hadoop.hbase.{HConstants}
-import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT
-import org.apache.phoenix.query.BaseTest
-import org.apache.phoenix.schema.{ColumnNotFoundException}
import org.apache.phoenix.schema.types.PVarchar
-import org.apache.phoenix.util.{SchemaUtil, ColumnInfo}
-import org.apache.spark.sql.{Row, SaveMode, SQLContext}
+import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
import org.apache.spark.sql.types._
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.{Row, SQLContext, SaveMode}
import org.joda.time.DateTime
-import org.scalatest._
import scala.collection.mutable.ListBuffer
-/*
- Note: If running directly from an IDE, these are the recommended VM parameters:
- -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
- */
-
-// Helper object to access the protected abstract static methods hidden in BaseHBaseManagedTimeIT
-object PhoenixSparkITHelper extends BaseHBaseManagedTimeIT {
- def getTestClusterConfig = BaseHBaseManagedTimeIT.getTestClusterConfig
- def doSetup = {
- // The @ClassRule doesn't seem to be getting picked up, force creation here before setup
- BaseTest.tmpFolder.create()
- BaseHBaseManagedTimeIT.doSetup()
- }
- def doTeardown = BaseHBaseManagedTimeIT.doTeardown()
- def getUrl = BaseTest.getUrl
-}
-
-class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
- var conn: Connection = _
- var sc: SparkContext = _
-
- lazy val hbaseConfiguration = {
- val conf = PhoenixSparkITHelper.getTestClusterConfig
- conf
- }
-
- lazy val quorumAddress = {
- ConfigurationUtil.getZookeeperURL(hbaseConfiguration).get
- }
-
- override def beforeAll() {
- PhoenixSparkITHelper.doSetup
-
- conn = DriverManager.getConnection(PhoenixSparkITHelper.getUrl)
- conn.setAutoCommit(true)
-
- // each SQL statement used to set up Phoenix must be on a single line. Yes, that
- // can potentially make large lines.
- val setupSqlSource = getClass.getClassLoader.getResourceAsStream("setup.sql")
-
- val setupSql = scala.io.Source.fromInputStream(setupSqlSource).getLines()
- .filter(line => ! line.startsWith("--") && ! line.isEmpty)
-
- for (sql <- setupSql) {
- val stmt = conn.createStatement()
- stmt.execute(sql)
- }
- conn.commit()
-
- val conf = new SparkConf()
- .setAppName("PhoenixSparkIT")
- .setMaster("local[2]") // 2 threads, some parallelism
- .set("spark.ui.showConsoleProgress", "false") // Disable printing stage progress
-
- sc = new SparkContext(conf)
- }
-
- override def afterAll() {
- conn.close()
- sc.stop()
- PhoenixSparkITHelper.doTeardown
- }
+/**
+ * Note: If running directly from an IDE, these are the recommended VM parameters:
+ * -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ */
+class PhoenixSparkIT extends AbstractPhoenixSparkIT {
test("Can convert Phoenix schema") {
val phoenixSchema = List(
@@ -120,7 +56,8 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
df2.registerTempTable("sql_table_2")
- val sqlRdd = sqlContext.sql("""
+ val sqlRdd = sqlContext.sql(
+ """
|SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
|INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
)
@@ -162,9 +99,10 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
df2.registerTempTable("sql_table_2")
- val sqlRdd = sqlContext.sql("""
- |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
- |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
+ val sqlRdd = sqlContext.sql(
+ """
+ |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
+ |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
)
val count = sqlRdd.count()
@@ -196,10 +134,11 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
val df1 = sqlContext.phoenixTableAsDataFrame(
"DATE_PREDICATE_TEST_TABLE",
Array("ID", "TIMESERIES_KEY"),
- predicate = Some("""
- |ID > 0 AND TIMESERIES_KEY BETWEEN
- |CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND
- |CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)""".stripMargin),
+ predicate = Some(
+ """
+ |ID > 0 AND TIMESERIES_KEY BETWEEN
+ |CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND
+ |CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)""".stripMargin),
conf = hbaseConfiguration)
df1.registerTempTable("date_predicate_test_table")
@@ -280,7 +219,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
.parallelize(dataSet)
.saveToPhoenix(
"OUTPUT_TEST_TABLE",
- Seq("ID","COL1","COL2","COL3"),
+ Seq("ID", "COL1", "COL2", "COL3"),
zkUrl = Some(quorumAddress)
)
@@ -390,7 +329,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
.parallelize(dataSet)
.saveToPhoenix(
"ARRAY_TEST_TABLE",
- Seq("ID","VCARRAY"),
+ Seq("ID", "VCARRAY"),
zkUrl = Some(quorumAddress)
)
@@ -429,7 +368,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
test("Ensure DataFrame field normalization (PHOENIX-2196)") {
val rdd1 = sc
- .parallelize(Seq((1L,1L,"One"),(2L,2L,"Two")))
+ .parallelize(Seq((1L, 1L, "One"), (2L, 2L, "Two")))
.map(p => Row(p._1, p._2, p._3))
val sqlContext = new SQLContext(sc)
@@ -580,7 +519,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
}
test("Can save binary types back to phoenix") {
- val dataSet = List((2L, Array[Byte](1), Array[Byte](1,2,3), Array[Array[Byte]](Array[Byte](1), Array[Byte](2))))
+ val dataSet = List((2L, Array[Byte](1), Array[Byte](1, 2, 3), Array[Array[Byte]](Array[Byte](1), Array[Byte](2))))
sc
.parallelize(dataSet)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
new file mode 100644
index 0000000..4231849
--- /dev/null
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
@@ -0,0 +1,111 @@
+/*
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.spark.sql.SQLContext
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Sub-class of PhoenixSparkIT used for tenant-specific test
+ *
+ * Note: If running directly from an IDE, these are the recommended VM parameters:
+ * -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+ *
+ */
+class PhoenixSparkITTenantSpecific extends AbstractPhoenixSparkIT {
+
+ val SelectStatement = "SELECT " + OrgId + "," + TenantCol + " FROM " + ViewName
+ val DataSet = List(("testOrg1", "data1"), ("testOrg2", "data2"), ("testOrg3", "data3"))
+
+ def verifyResults(): Unit = {
+ val stmt = conn.createStatement()
+ val rs = stmt.executeQuery(SelectStatement)
+
+ val results = ListBuffer[(String, String)]()
+ while (rs.next()) {
+ results.append((rs.getString(1), rs.getString(2)))
+ }
+ stmt.close()
+
+ results.toList shouldEqual DataSet
+ }
+
+ test("Can persist a dataframe using 'DataFrame.saveToPhoenix' on tenant-specific view") {
+ val sqlContext = new SQLContext(sc)
+ import sqlContext.implicits._
+
+ val dataSet = List(("testOrg1", "data1"), ("testOrg2", "data2"), ("testOrg3", "data3"))
+
+ val df = sc.parallelize(dataSet).toDF(OrgId, TenantCol)
+
+ // Save to tenant-specific view
+ df.saveToPhoenix("TENANT_VIEW", zkUrl = Some(quorumAddress), tenantId = Some(TenantId))
+
+ df.write
+ .format("org.apache.phoenix.spark")
+ .mode("overwrite")
+ .option("table", "TENANT_VIEW")
+ .option("TenantId", "theTenant")
+ .option("zkUrl", PhoenixSparkITHelper.getUrl)
+ .save()
+
+ verifyResults
+ }
+
+ test("Can persist a dataframe using 'DataFrame.write' on tenant-specific view") {
+
+ val sqlContext = new SQLContext(sc)
+ import sqlContext.implicits._
+
+ val dataSet = List(("testOrg1", "data1"), ("testOrg2", "data2"), ("testOrg3", "data3"))
+
+ val df = sc.parallelize(dataSet).toDF(OrgId, TenantCol)
+
+ df.write
+ .format("org.apache.phoenix.spark")
+ .mode("overwrite")
+ .option("table", "TENANT_VIEW")
+ .option("TenantId", "theTenant")
+ .option("zkUrl", PhoenixSparkITHelper.getUrl)
+ .save()
+
+ verifyResults
+ }
+
+ test("Can save to Phoenix tenant-specific view") {
+ val sqlContext = new SQLContext(sc)
+
+ // This view name must match the view we create in phoenix-spark/src/it/resources/tenantSetup.sql
+ val ViewName = "TENANT_VIEW"
+
+ // Columns from the TENANT_VIEW schema
+ val OrgId = "ORGANIZATION_ID"
+ val TenantCol = "TENANT_ONLY_COL"
+
+ // Data matching the schema for TENANT_VIEW created in tenantSetup.sql
+ val dataSet = List(("testOrg1", "data1"), ("testOrg2", "data2"), ("testOrg3", "data3"))
+
+ sc
+ .parallelize(dataSet)
+ .saveToPhoenix(
+ ViewName,
+ Seq(OrgId, TenantCol),
+ hbaseConfiguration,
+ tenantId = Some(TenantId)
+ )
+
+ verifyResults
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
index 2f4311f..ca476e7 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
@@ -17,12 +17,13 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver
import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
-import org.apache.phoenix.util.ColumnInfo
+import org.apache.phoenix.util.{ColumnInfo, PhoenixRuntime}
+
import scala.collection.JavaConversions._
object ConfigurationUtil extends Serializable {
- def getOutputConfiguration(tableName: String, columns: Seq[String], zkUrl: Option[String], conf: Option[Configuration] = None): Configuration = {
+ def getOutputConfiguration(tableName: String, columns: Seq[String], zkUrl: Option[String], tenantId: Option[String] = None, conf: Option[Configuration] = None): Configuration = {
// Create an HBaseConfiguration object from the passed in config, if present
val config = conf match {
@@ -30,6 +31,12 @@ object ConfigurationUtil extends Serializable {
case _ => HBaseConfiguration.create()
}
+ // Set the tenantId in the config if present
+ tenantId match {
+ case Some(id) => setTenantId(config, id)
+ case _ =>
+ }
+
// Set the table to save to
PhoenixConfigurationUtil.setOutputTableName(config, tableName)
PhoenixConfigurationUtil.setPhysicalTableName(config, tableName)
@@ -59,7 +66,10 @@ object ConfigurationUtil extends Serializable {
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, info.getPort)
if (info.getRootNode != null)
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, info.getRootNode)
+ }
+ def setTenantId(conf: Configuration, tenantId: String) = {
+ conf.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId)
}
// Return a serializable representation of the columns
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
index 9408210..bb2efd5 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -16,23 +16,24 @@ package org.apache.phoenix.spark
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.NullWritable
import org.apache.phoenix.mapreduce.PhoenixOutputFormat
-import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
import org.apache.phoenix.util.SchemaUtil
import org.apache.spark.Logging
import org.apache.spark.sql.DataFrame
+
import scala.collection.JavaConversions._
class DataFrameFunctions(data: DataFrame) extends Logging with Serializable {
def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
- zkUrl: Option[String] = None): Unit = {
+ zkUrl: Option[String] = None, tenantId: Option[String] = None): Unit = {
// Retrieve the schema field names and normalize to Phoenix, need to do this outside of mapPartitions
val fieldArray = data.schema.fieldNames.map(x => SchemaUtil.normalizeIdentifier(x))
// Create a configuration object to use for saving
- @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, Some(conf))
+ @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, tenantId, Some(conf))
// Retrieve the zookeeper URL
val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig)
@@ -41,7 +42,7 @@ class DataFrameFunctions(data: DataFrame) extends Logging with Serializable {
val phxRDD = data.mapPartitions{ rows =>
// Create a within-partition config to retrieve the ColumnInfo list
- @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrlFinal)
+ @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrlFinal, tenantId)
@transient val columns = PhoenixConfigurationUtil.getUpsertColumnMetadataList(partitionConfig).toList
rows.map { row =>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
index 15d1944..743d196 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
@@ -17,9 +17,8 @@
*/
package org.apache.phoenix.spark
-import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
-import org.apache.spark.sql.sources.{CreatableRelationProvider, BaseRelation, RelationProvider}
-import org.apache.phoenix.spark._
+import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider}
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
class DefaultSource extends RelationProvider with CreatableRelationProvider {
@@ -45,7 +44,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
verifyParameters(parameters)
// Save the DataFrame to Phoenix
- data.saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"))
+ data.saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"))
// Return a relation of the saved data
createRelation(sqlContext, parameters)
@@ -56,4 +55,5 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
if (parameters.get("table").isEmpty) throw new RuntimeException("No Phoenix 'table' option defined")
if (parameters.get("zkUrl").isEmpty) throw new RuntimeException("No Phoenix 'zkUrl' option defined")
}
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
index 2e0c58d..b59592b 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
@@ -16,19 +16,20 @@ package org.apache.phoenix.spark
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.NullWritable
import org.apache.phoenix.mapreduce.PhoenixOutputFormat
-import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
+
import scala.collection.JavaConversions._
class ProductRDDFunctions[A <: Product](data: RDD[A]) extends Logging with Serializable {
def saveToPhoenix(tableName: String, cols: Seq[String],
- conf: Configuration = new Configuration, zkUrl: Option[String] = None)
+ conf: Configuration = new Configuration, zkUrl: Option[String] = None, tenantId: Option[String] = None)
: Unit = {
// Create a configuration object to use for saving
- @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, cols, zkUrl, Some(conf))
+ @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, cols, zkUrl, tenantId, Some(conf))
// Retrieve the zookeeper URL
val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig)
@@ -37,7 +38,7 @@ class ProductRDDFunctions[A <: Product](data: RDD[A]) extends Logging with Seria
val phxRDD = data.mapPartitions{ rows =>
// Create a within-partition config to retrieve the ColumnInfo list
- @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName, cols, zkUrlFinal)
+ @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName, cols, zkUrlFinal, tenantId)
@transient val columns = PhoenixConfigurationUtil.getUpsertColumnMetadataList(partitionConfig).toList
rows.map { row =>