You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by jm...@apache.org on 2016/11/10 19:04:29 UTC

phoenix git commit: PHOENIX-3427 phoenix-spark: Table undefined for tenant specific view (Nico Pappagianis)

Repository: phoenix
Updated Branches:
  refs/heads/master 32aff3910 -> 3886638b0


PHOENIX-3427 phoenix-spark: Table undefined for tenant specific view (Nico Pappagianis)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3886638b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3886638b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3886638b

Branch: refs/heads/master
Commit: 3886638b04a3e494be70e1e681ad56cb1c7e33c5
Parents: 32aff39
Author: Josh Mahonin <jm...@gmail.com>
Authored: Thu Nov 10 14:02:59 2016 -0500
Committer: Josh Mahonin <jm...@gmail.com>
Committed: Thu Nov 10 14:02:59 2016 -0500

----------------------------------------------------------------------
 phoenix-spark/src/it/resources/globalSetup.sql  |  58 ++++++++++
 phoenix-spark/src/it/resources/setup.sql        |  56 ----------
 phoenix-spark/src/it/resources/tenantSetup.sql  |  17 +++
 .../phoenix/spark/AbstractPhoenixSparkIT.scala  | 110 ++++++++++++++++++
 .../apache/phoenix/spark/PhoenixSparkIT.scala   | 105 ++++--------------
 .../spark/PhoenixSparkITTenantSpecific.scala    | 111 +++++++++++++++++++
 .../phoenix/spark/ConfigurationUtil.scala       |  14 ++-
 .../phoenix/spark/DataFrameFunctions.scala      |   9 +-
 .../apache/phoenix/spark/DefaultSource.scala    |   8 +-
 .../phoenix/spark/ProductRDDFunctions.scala     |   9 +-
 10 files changed, 344 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/it/resources/globalSetup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/globalSetup.sql b/phoenix-spark/src/it/resources/globalSetup.sql
new file mode 100644
index 0000000..852687e
--- /dev/null
+++ b/phoenix-spark/src/it/resources/globalSetup.sql
@@ -0,0 +1,58 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE table1 (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
+CREATE TABLE table1_copy (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
+CREATE TABLE table2 (id BIGINT NOT NULL PRIMARY KEY, table1_id BIGINT, "t2col1" VARCHAR)
+UPSERT INTO table1 (id, col1) VALUES (1, 'test_row_1')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (1, 1, 'test_child_1')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (2, 1, 'test_child_2')
+UPSERT INTO table1 (id, col1) VALUES (2, 'test_row_2')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (3, 2, 'test_child_1')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (4, 2, 'test_child_2')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (5, 2, 'test_child_3')
+UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (6, 2, 'test_child_4')
+CREATE TABLE "table3" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR)
+UPSERT INTO "table3" ("id", "col1") VALUES (1, 'foo')
+UPSERT INTO "table3" ("id", "col1") VALUES (2, 'bar')
+CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])
+UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'])
+CREATE TABLE ARRAYBUFFER_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[], INTARRAY INTEGER[])
+UPSERT INTO ARRAYBUFFER_TEST_TABLE (ID, VCARRAY, INTARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'], ARRAY[1, 2, 3])
+CREATE TABLE ARRAY_ANYVAL_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, INTARRAY INTEGER[], BIGINTARRAY BIGINT[])
+UPSERT INTO ARRAY_ANYVAL_TEST_TABLE (ID, INTARRAY, BIGINTARRAY) VALUES (1, ARRAY[1, 2, 3], ARRAY[1, 2, 3])
+CREATE TABLE ARRAY_BYTE_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, BYTEARRAY TINYINT[])
+UPSERT INTO ARRAY_BYTE_TEST_TABLE (ID, BYTEARRAY) VALUES (1, ARRAY[1, 2, 3])
+CREATE TABLE VARBINARY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, BIN BINARY(1), VARBIN VARBINARY, BINARRAY BINARY(1)[])
+CREATE TABLE DATE_PREDICATE_TEST_TABLE (ID BIGINT NOT NULL, TIMESERIES_KEY TIMESTAMP NOT NULL CONSTRAINT pk PRIMARY KEY (ID, TIMESERIES_KEY))
+UPSERT INTO DATE_PREDICATE_TEST_TABLE (ID, TIMESERIES_KEY) VALUES (1, CAST(CURRENT_TIME() AS TIMESTAMP))
+CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER, col3 DATE)
+CREATE TABLE CUSTOM_ENTITY."z02"(id BIGINT NOT NULL PRIMARY KEY)
+UPSERT INTO CUSTOM_ENTITY."z02" (id) VALUES(1)
+CREATE TABLE TEST_DECIMAL (ID BIGINT NOT NULL PRIMARY KEY, COL1 DECIMAL(9, 6))
+UPSERT INTO TEST_DECIMAL VALUES (1, 123.456789)
+CREATE TABLE TEST_SMALL_TINY (ID BIGINT NOT NULL PRIMARY KEY, COL1 SMALLINT, COL2 TINYINT)
+UPSERT INTO TEST_SMALL_TINY VALUES (1, 32767, 127)
+CREATE TABLE DATE_TEST(ID BIGINT NOT NULL PRIMARY KEY, COL1 DATE)
+UPSERT INTO DATE_TEST VALUES(1, CURRENT_DATE())
+CREATE TABLE "space" ("key" VARCHAR PRIMARY KEY, "first name" VARCHAR)
+UPSERT INTO "space" VALUES ('key1', 'xyz')
+CREATE TABLE "small" ("key" VARCHAR PRIMARY KEY, "first name" VARCHAR, "salary" INTEGER )
+UPSERT INTO "small" VALUES ('key1', 'foo', 10000)
+UPSERT INTO "small" VALUES ('key2', 'bar', 20000)
+UPSERT INTO "small" VALUES ('key3', 'xyz', 30000)
+
+CREATE TABLE MULTITENANT_TEST_TABLE (TENANT_ID VARCHAR NOT NULL, ORGANIZATION_ID VARCHAR, GLOBAL_COL1 VARCHAR  CONSTRAINT pk PRIMARY KEY (TENANT_ID, ORGANIZATION_ID)) MULTI_TENANT=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/it/resources/setup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/setup.sql b/phoenix-spark/src/it/resources/setup.sql
deleted file mode 100644
index e56924f..0000000
--- a/phoenix-spark/src/it/resources/setup.sql
+++ /dev/null
@@ -1,56 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
-CREATE TABLE table1 (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
-CREATE TABLE table1_copy (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR)
-CREATE TABLE table2 (id BIGINT NOT NULL PRIMARY KEY, table1_id BIGINT, "t2col1" VARCHAR)
-UPSERT INTO table1 (id, col1) VALUES (1, 'test_row_1')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (1, 1, 'test_child_1')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (2, 1, 'test_child_2')
-UPSERT INTO table1 (id, col1) VALUES (2, 'test_row_2')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (3, 2, 'test_child_1')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (4, 2, 'test_child_2')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (5, 2, 'test_child_3')
-UPSERT INTO table2 (id, table1_id, "t2col1") VALUES (6, 2, 'test_child_4')
-CREATE TABLE "table3" ("id" BIGINT NOT NULL PRIMARY KEY, "col1" VARCHAR)
-UPSERT INTO "table3" ("id", "col1") VALUES (1, 'foo')
-UPSERT INTO "table3" ("id", "col1") VALUES (2, 'bar')
-CREATE TABLE ARRAY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])
-UPSERT INTO ARRAY_TEST_TABLE (ID, VCARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'])
-CREATE TABLE ARRAYBUFFER_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[], INTARRAY INTEGER[])
-UPSERT INTO ARRAYBUFFER_TEST_TABLE (ID, VCARRAY, INTARRAY) VALUES (1, ARRAY['String1', 'String2', 'String3'], ARRAY[1, 2, 3])
-CREATE TABLE ARRAY_ANYVAL_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, INTARRAY INTEGER[], BIGINTARRAY BIGINT[])
-UPSERT INTO ARRAY_ANYVAL_TEST_TABLE (ID, INTARRAY, BIGINTARRAY) VALUES (1, ARRAY[1, 2, 3], ARRAY[1, 2, 3])
-CREATE TABLE ARRAY_BYTE_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, BYTEARRAY TINYINT[])
-UPSERT INTO ARRAY_BYTE_TEST_TABLE (ID, BYTEARRAY) VALUES (1, ARRAY[1, 2, 3])
-CREATE TABLE VARBINARY_TEST_TABLE (ID BIGINT NOT NULL PRIMARY KEY, BIN BINARY(1), VARBIN VARBINARY, BINARRAY BINARY(1)[])
-CREATE TABLE DATE_PREDICATE_TEST_TABLE (ID BIGINT NOT NULL, TIMESERIES_KEY TIMESTAMP NOT NULL CONSTRAINT pk PRIMARY KEY (ID, TIMESERIES_KEY))
-UPSERT INTO DATE_PREDICATE_TEST_TABLE (ID, TIMESERIES_KEY) VALUES (1, CAST(CURRENT_TIME() AS TIMESTAMP))
-CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER, col3 DATE)
-CREATE TABLE CUSTOM_ENTITY."z02"(id BIGINT NOT NULL PRIMARY KEY)
-UPSERT INTO CUSTOM_ENTITY."z02" (id) VALUES(1)
-CREATE TABLE TEST_DECIMAL (ID BIGINT NOT NULL PRIMARY KEY, COL1 DECIMAL(9, 6))
-UPSERT INTO TEST_DECIMAL VALUES (1, 123.456789)
-CREATE TABLE TEST_SMALL_TINY (ID BIGINT NOT NULL PRIMARY KEY, COL1 SMALLINT, COL2 TINYINT)
-UPSERT INTO TEST_SMALL_TINY VALUES (1, 32767, 127)
-CREATE TABLE DATE_TEST(ID BIGINT NOT NULL PRIMARY KEY, COL1 DATE)
-UPSERT INTO DATE_TEST VALUES(1, CURRENT_DATE())
-CREATE TABLE "space" ("key" VARCHAR PRIMARY KEY, "first name" VARCHAR)
-UPSERT INTO "space" VALUES ('key1', 'xyz')
-CREATE TABLE "small" ("key" VARCHAR PRIMARY KEY, "first name" VARCHAR, "salary" INTEGER )
-UPSERT INTO "small" VALUES ('key1', 'foo', 10000)
-UPSERT INTO "small" VALUES ('key2', 'bar', 20000)
-UPSERT INTO "small" VALUES ('key3', 'xyz', 30000)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/it/resources/tenantSetup.sql
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/resources/tenantSetup.sql b/phoenix-spark/src/it/resources/tenantSetup.sql
new file mode 100644
index 0000000..4a866dc
--- /dev/null
+++ b/phoenix-spark/src/it/resources/tenantSetup.sql
@@ -0,0 +1,17 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE VIEW IF NOT EXISTS TENANT_VIEW(TENANT_ONLY_COL VARCHAR) AS SELECT * FROM MULTITENANT_TEST_TABLE

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
new file mode 100644
index 0000000..27dfe81
--- /dev/null
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
@@ -0,0 +1,110 @@
+/*
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import java.sql.{Connection, DriverManager}
+
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT
+import org.apache.phoenix.query.BaseTest
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
+
+
+// Helper object to access the protected abstract static methods hidden in BaseHBaseManagedTimeIT
+object PhoenixSparkITHelper extends BaseHBaseManagedTimeIT {
+  def getTestClusterConfig = BaseHBaseManagedTimeIT.getTestClusterConfig
+
+  def doSetup = {
+    // The @ClassRule doesn't seem to be getting picked up, force creation here before setup
+    BaseTest.tmpFolder.create()
+    BaseHBaseManagedTimeIT.doSetup()
+  }
+
+  def doTeardown = BaseHBaseManagedTimeIT.doTeardown()
+
+  def getUrl = BaseTest.getUrl
+}
+
+/**
+  * Base class for PhoenixSparkIT
+  */
+class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
+
+  // A global tenantId we can use across tests
+  final val TenantId = "theTenant"
+
+  // TENANT_VIEW schema
+  val OrgId = "ORGANIZATION_ID"
+  val TenantCol = "TENANT_ONLY_COL"
+  val ViewName = "TENANT_VIEW"
+
+  var conn: Connection = _
+  var sc: SparkContext = _
+
+  lazy val hbaseConfiguration = {
+    val conf = PhoenixSparkITHelper.getTestClusterConfig
+    conf
+  }
+
+  lazy val quorumAddress = {
+    ConfigurationUtil.getZookeeperURL(hbaseConfiguration).get
+  }
+
+  // Runs SQL commands located in the file defined in the sqlSource argument
+  // Optional argument tenantId used for running tenant-specific SQL
+  def setupTables(sqlSource: String, tenantId: Option[String]): Unit = {
+    val url = tenantId match {
+      case Some(tenantId) => PhoenixSparkITHelper.getUrl + ";TenantId=" + tenantId
+      case _ => PhoenixSparkITHelper.getUrl
+    }
+
+    conn = DriverManager.getConnection(url)
+    conn.setAutoCommit(true)
+
+    val setupSqlSource = getClass.getClassLoader.getResourceAsStream(sqlSource)
+
+    // each SQL statement used to set up Phoenix must be on a single line. Yes, that
+    // can potentially make large lines.
+    val setupSql = scala.io.Source.fromInputStream(setupSqlSource).getLines()
+      .filter(line => !line.startsWith("--") && !line.isEmpty)
+
+    for (sql <- setupSql) {
+      val stmt = conn.createStatement()
+      stmt.execute(sql)
+    }
+    conn.commit()
+  }
+
+  override def beforeAll() {
+    PhoenixSparkITHelper.doSetup
+
+    // We pass in null for TenantId here since these tables will be globally visible
+    setupTables("globalSetup.sql", null)
+    // We pass in a TenantId to allow the DDL to create tenant-specific tables/views
+    setupTables("tenantSetup.sql", Some(TenantId))
+
+    val conf = new SparkConf()
+      .setAppName("PhoenixSparkIT")
+      .setMaster("local[2]") // 2 threads, some parallelism
+      .set("spark.ui.showConsoleProgress", "false") // Disable printing stage progress
+
+    sc = new SparkContext(conf)
+  }
+
+  override def afterAll() {
+    conn.close()
+    sc.stop()
+    PhoenixSparkITHelper.doTeardown
+  }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index 7d05f07..8aeba09 100644
--- a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -13,85 +13,21 @@
  */
 package org.apache.phoenix.spark
 
-import java.sql.{Connection, DriverManager}
 import java.util.Date
 
-import org.apache.hadoop.hbase.{HConstants}
-import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT
-import org.apache.phoenix.query.BaseTest
-import org.apache.phoenix.schema.{ColumnNotFoundException}
 import org.apache.phoenix.schema.types.PVarchar
-import org.apache.phoenix.util.{SchemaUtil, ColumnInfo}
-import org.apache.spark.sql.{Row, SaveMode, SQLContext}
+import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
 import org.apache.spark.sql.types._
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.{Row, SQLContext, SaveMode}
 import org.joda.time.DateTime
-import org.scalatest._
 
 import scala.collection.mutable.ListBuffer
 
-/*
-  Note: If running directly from an IDE, these are the recommended VM parameters:
-  -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
- */
-
-// Helper object to access the protected abstract static methods hidden in BaseHBaseManagedTimeIT
-object PhoenixSparkITHelper extends BaseHBaseManagedTimeIT {
-  def getTestClusterConfig = BaseHBaseManagedTimeIT.getTestClusterConfig
-  def doSetup = {
-    // The @ClassRule doesn't seem to be getting picked up, force creation here before setup
-    BaseTest.tmpFolder.create()
-    BaseHBaseManagedTimeIT.doSetup()
-  }
-  def doTeardown = BaseHBaseManagedTimeIT.doTeardown()
-  def getUrl = BaseTest.getUrl
-}
-
-class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
-  var conn: Connection = _
-  var sc: SparkContext = _
-
-  lazy val hbaseConfiguration = {
-    val conf = PhoenixSparkITHelper.getTestClusterConfig
-    conf
-  }
-
-  lazy val quorumAddress = {
-    ConfigurationUtil.getZookeeperURL(hbaseConfiguration).get
-  }
-
-  override def beforeAll() {
-    PhoenixSparkITHelper.doSetup
-
-    conn = DriverManager.getConnection(PhoenixSparkITHelper.getUrl)
-    conn.setAutoCommit(true)
-
-    // each SQL statement used to set up Phoenix must be on a single line. Yes, that
-    // can potentially make large lines.
-    val setupSqlSource = getClass.getClassLoader.getResourceAsStream("setup.sql")
-
-    val setupSql = scala.io.Source.fromInputStream(setupSqlSource).getLines()
-      .filter(line => ! line.startsWith("--") && ! line.isEmpty)
-
-    for (sql <- setupSql) {
-      val stmt = conn.createStatement()
-      stmt.execute(sql)
-    }
-    conn.commit()
-
-    val conf = new SparkConf()
-      .setAppName("PhoenixSparkIT")
-      .setMaster("local[2]") // 2 threads, some parallelism
-      .set("spark.ui.showConsoleProgress", "false") // Disable printing stage progress
-
-    sc = new SparkContext(conf)
-  }
-
-  override def afterAll() {
-    conn.close()
-    sc.stop()
-    PhoenixSparkITHelper.doTeardown
-  }
+/**
+  * Note: If running directly from an IDE, these are the recommended VM parameters:
+  * -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+  */
+class PhoenixSparkIT extends AbstractPhoenixSparkIT {
 
   test("Can convert Phoenix schema") {
     val phoenixSchema = List(
@@ -120,7 +56,8 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
 
     df2.registerTempTable("sql_table_2")
 
-    val sqlRdd = sqlContext.sql("""
+    val sqlRdd = sqlContext.sql(
+      """
         |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
         |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
     )
@@ -162,9 +99,10 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
 
     df2.registerTempTable("sql_table_2")
 
-    val sqlRdd = sqlContext.sql("""
-      |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
-      |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
+    val sqlRdd = sqlContext.sql(
+      """
+        |SELECT t1.ID, t1.COL1, t2.ID, t2.TABLE1_ID FROM sql_table_1 AS t1
+        |INNER JOIN sql_table_2 AS t2 ON (t2.TABLE1_ID = t1.ID)""".stripMargin
     )
 
     val count = sqlRdd.count()
@@ -196,10 +134,11 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
     val df1 = sqlContext.phoenixTableAsDataFrame(
       "DATE_PREDICATE_TEST_TABLE",
       Array("ID", "TIMESERIES_KEY"),
-      predicate = Some("""
-        |ID > 0 AND TIMESERIES_KEY BETWEEN
-        |CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND
-        |CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)""".stripMargin),
+      predicate = Some(
+        """
+          |ID > 0 AND TIMESERIES_KEY BETWEEN
+          |CAST(TO_DATE('1990-01-01 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AND
+          |CAST(TO_DATE('1990-01-30 00:00:01', 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP)""".stripMargin),
       conf = hbaseConfiguration)
 
     df1.registerTempTable("date_predicate_test_table")
@@ -280,7 +219,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
       .parallelize(dataSet)
       .saveToPhoenix(
         "OUTPUT_TEST_TABLE",
-        Seq("ID","COL1","COL2","COL3"),
+        Seq("ID", "COL1", "COL2", "COL3"),
         zkUrl = Some(quorumAddress)
       )
 
@@ -390,7 +329,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
       .parallelize(dataSet)
       .saveToPhoenix(
         "ARRAY_TEST_TABLE",
-        Seq("ID","VCARRAY"),
+        Seq("ID", "VCARRAY"),
         zkUrl = Some(quorumAddress)
       )
 
@@ -429,7 +368,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
 
   test("Ensure DataFrame field normalization (PHOENIX-2196)") {
     val rdd1 = sc
-      .parallelize(Seq((1L,1L,"One"),(2L,2L,"Two")))
+      .parallelize(Seq((1L, 1L, "One"), (2L, 2L, "Two")))
       .map(p => Row(p._1, p._2, p._3))
 
     val sqlContext = new SQLContext(sc)
@@ -580,7 +519,7 @@ class PhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfterAll {
   }
 
   test("Can save binary types back to phoenix") {
-    val dataSet = List((2L, Array[Byte](1), Array[Byte](1,2,3), Array[Array[Byte]](Array[Byte](1), Array[Byte](2))))
+    val dataSet = List((2L, Array[Byte](1), Array[Byte](1, 2, 3), Array[Array[Byte]](Array[Byte](1), Array[Byte](2))))
 
     sc
       .parallelize(dataSet)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
new file mode 100644
index 0000000..4231849
--- /dev/null
+++ b/phoenix-spark/src/it/scala/org/apache/phoenix/spark/PhoenixSparkITTenantSpecific.scala
@@ -0,0 +1,111 @@
+/*
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ */
+package org.apache.phoenix.spark
+
+import org.apache.spark.sql.SQLContext
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Sub-class of PhoenixSparkIT used for tenant-specific test
+  *
+  * Note: If running directly from an IDE, these are the recommended VM parameters:
+  * -Xmx1536m -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+  *
+  */
+class PhoenixSparkITTenantSpecific extends AbstractPhoenixSparkIT {
+
+  val SelectStatement = "SELECT " + OrgId + "," + TenantCol + " FROM " + ViewName
+  val DataSet = List(("testOrg1", "data1"), ("testOrg2", "data2"), ("testOrg3", "data3"))
+
+  def verifyResults(): Unit = {
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery(SelectStatement)
+
+    val results = ListBuffer[(String, String)]()
+    while (rs.next()) {
+      results.append((rs.getString(1), rs.getString(2)))
+    }
+    stmt.close()
+
+    results.toList shouldEqual DataSet
+  }
+
+  test("Can persist a dataframe using 'DataFrame.saveToPhoenix' on tenant-specific view") {
+    val sqlContext = new SQLContext(sc)
+    import sqlContext.implicits._
+
+    val dataSet = List(("testOrg1", "data1"), ("testOrg2", "data2"), ("testOrg3", "data3"))
+
+    val df = sc.parallelize(dataSet).toDF(OrgId, TenantCol)
+
+    // Save to tenant-specific view
+    df.saveToPhoenix("TENANT_VIEW", zkUrl = Some(quorumAddress), tenantId = Some(TenantId))
+
+    df.write
+      .format("org.apache.phoenix.spark")
+      .mode("overwrite")
+      .option("table", "TENANT_VIEW")
+      .option("TenantId", "theTenant")
+      .option("zkUrl", PhoenixSparkITHelper.getUrl)
+      .save()
+
+    verifyResults
+  }
+
+  test("Can persist a dataframe using 'DataFrame.write' on tenant-specific view") {
+
+    val sqlContext = new SQLContext(sc)
+    import sqlContext.implicits._
+
+    val dataSet = List(("testOrg1", "data1"), ("testOrg2", "data2"), ("testOrg3", "data3"))
+
+    val df = sc.parallelize(dataSet).toDF(OrgId, TenantCol)
+
+    df.write
+      .format("org.apache.phoenix.spark")
+      .mode("overwrite")
+      .option("table", "TENANT_VIEW")
+      .option("TenantId", "theTenant")
+      .option("zkUrl", PhoenixSparkITHelper.getUrl)
+      .save()
+
+    verifyResults
+  }
+
+  test("Can save to Phoenix tenant-specific view") {
+    val sqlContext = new SQLContext(sc)
+
+    // This view name must match the view we create in phoenix-spark/src/it/resources/tenantSetup.sql
+    val ViewName = "TENANT_VIEW"
+
+    // Columns from the TENANT_VIEW schema
+    val OrgId = "ORGANIZATION_ID"
+    val TenantCol = "TENANT_ONLY_COL"
+
+    // Data matching the schema for TENANT_VIEW created in tenantSetup.sql
+    val dataSet = List(("testOrg1", "data1"), ("testOrg2", "data2"), ("testOrg3", "data3"))
+
+    sc
+      .parallelize(dataSet)
+      .saveToPhoenix(
+        ViewName,
+        Seq(OrgId, TenantCol),
+        hbaseConfiguration,
+        tenantId = Some(TenantId)
+      )
+
+    verifyResults
+  }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
index 2f4311f..ca476e7 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala
@@ -17,12 +17,13 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver
 import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
-import org.apache.phoenix.util.ColumnInfo
+import org.apache.phoenix.util.{ColumnInfo, PhoenixRuntime}
+
 import scala.collection.JavaConversions._
 
 object ConfigurationUtil extends Serializable {
 
-  def getOutputConfiguration(tableName: String, columns: Seq[String], zkUrl: Option[String], conf: Option[Configuration] = None): Configuration = {
+  def getOutputConfiguration(tableName: String, columns: Seq[String], zkUrl: Option[String], tenantId: Option[String] = None, conf: Option[Configuration] = None): Configuration = {
 
     // Create an HBaseConfiguration object from the passed in config, if present
     val config = conf match {
@@ -30,6 +31,12 @@ object ConfigurationUtil extends Serializable {
       case _ => HBaseConfiguration.create()
     }
 
+    // Set the tenantId in the config if present
+    tenantId match {
+      case Some(id) => setTenantId(config, id)
+      case _ =>
+    }
+
     // Set the table to save to
     PhoenixConfigurationUtil.setOutputTableName(config, tableName)
     PhoenixConfigurationUtil.setPhysicalTableName(config, tableName)
@@ -59,7 +66,10 @@ object ConfigurationUtil extends Serializable {
       conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, info.getPort)
     if (info.getRootNode != null)
       conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, info.getRootNode)
+  }
 
+  def setTenantId(conf: Configuration, tenantId: String) = {
+    conf.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId)
   }
 
   // Return a serializable representation of the columns

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
index 9408210..bb2efd5 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala
@@ -16,23 +16,24 @@ package org.apache.phoenix.spark
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.NullWritable
 import org.apache.phoenix.mapreduce.PhoenixOutputFormat
-import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
 import org.apache.phoenix.util.SchemaUtil
 import org.apache.spark.Logging
 import org.apache.spark.sql.DataFrame
+
 import scala.collection.JavaConversions._
 
 class DataFrameFunctions(data: DataFrame) extends Logging with Serializable {
 
   def saveToPhoenix(tableName: String, conf: Configuration = new Configuration,
-                    zkUrl: Option[String] = None): Unit = {
+                    zkUrl: Option[String] = None, tenantId: Option[String] = None): Unit = {
 
 
     // Retrieve the schema field names and normalize to Phoenix, need to do this outside of mapPartitions
     val fieldArray = data.schema.fieldNames.map(x => SchemaUtil.normalizeIdentifier(x))
 
     // Create a configuration object to use for saving
-    @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, Some(conf))
+    @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, tenantId, Some(conf))
 
     // Retrieve the zookeeper URL
     val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig)
@@ -41,7 +42,7 @@ class DataFrameFunctions(data: DataFrame) extends Logging with Serializable {
     val phxRDD = data.mapPartitions{ rows =>
  
        // Create a within-partition config to retrieve the ColumnInfo list
-       @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrlFinal)
+       @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrlFinal, tenantId)
        @transient val columns = PhoenixConfigurationUtil.getUpsertColumnMetadataList(partitionConfig).toList
  
        rows.map { row =>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
index 15d1944..743d196 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DefaultSource.scala
@@ -17,9 +17,8 @@
  */
 package org.apache.phoenix.spark
 
-import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
-import org.apache.spark.sql.sources.{CreatableRelationProvider, BaseRelation, RelationProvider}
-import org.apache.phoenix.spark._
+import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider}
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 
 class DefaultSource extends RelationProvider with CreatableRelationProvider {
 
@@ -45,7 +44,7 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
     verifyParameters(parameters)
 
     // Save the DataFrame to Phoenix
-    data.saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"))
+    data.saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"))
 
     // Return a relation of the saved data
     createRelation(sqlContext, parameters)
@@ -56,4 +55,5 @@ class DefaultSource extends RelationProvider with CreatableRelationProvider {
     if (parameters.get("table").isEmpty) throw new RuntimeException("No Phoenix 'table' option defined")
     if (parameters.get("zkUrl").isEmpty) throw new RuntimeException("No Phoenix 'zkUrl' option defined")
   }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3886638b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
index 2e0c58d..b59592b 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ProductRDDFunctions.scala
@@ -16,19 +16,20 @@ package org.apache.phoenix.spark
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.NullWritable
 import org.apache.phoenix.mapreduce.PhoenixOutputFormat
-import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil}
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
 import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
+
 import scala.collection.JavaConversions._
 
 class ProductRDDFunctions[A <: Product](data: RDD[A]) extends Logging with Serializable {
 
   def saveToPhoenix(tableName: String, cols: Seq[String],
-                    conf: Configuration = new Configuration, zkUrl: Option[String] = None)
+                    conf: Configuration = new Configuration, zkUrl: Option[String] = None, tenantId: Option[String] = None)
                     : Unit = {
 
     // Create a configuration object to use for saving
-    @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, cols, zkUrl, Some(conf))
+    @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, cols, zkUrl, tenantId, Some(conf))
 
     // Retrieve the zookeeper URL
     val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig)
@@ -37,7 +38,7 @@ class ProductRDDFunctions[A <: Product](data: RDD[A]) extends Logging with Seria
     val phxRDD = data.mapPartitions{ rows =>
 
       // Create a within-partition config to retrieve the ColumnInfo list
-      @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName, cols, zkUrlFinal)
+      @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName, cols, zkUrlFinal, tenantId)
       @transient val columns = PhoenixConfigurationUtil.getUpsertColumnMetadataList(partitionConfig).toList
 
       rows.map { row =>