You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2021/11/24 01:28:19 UTC
[phoenix-connectors] branch master updated: PHOENIX-6590 Handle rollbacks in phoenix spark connector and add way to control batch wise or task wise transactions(Rajeshbabu) (#66)
This is an automated email from the ASF dual-hosted git repository.
rajeshbabu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new 609a423 PHOENIX-6590 Handle rollbacks in phoenix spark connector and add way to control batch wise or task wise transactions(Rajeshbabu) (#66)
609a423 is described below
commit 609a423646fd2c97bcb2ff7d1705aacd0ffd5ee2
Author: Rajeshbabu Chintaguntla <ch...@gmail.com>
AuthorDate: Wed Nov 24 06:58:13 2021 +0530
PHOENIX-6590 Handle rollbacks in phoenix spark connector and add way to control batch wise or task wise transactions(Rajeshbabu) (#66)
PHOENIX-6590 Handle rollbacks in phoenix spark connector and add way to control batch wise or task wise transactions(Rajeshbabu) (#66)Co-authored-by: Rajeshbabu Chintaguntla <ra...@apache.org>
---
.../src/it/resources/transactionTableSetup.sql | 17 +++++
.../phoenix/spark/AbstractPhoenixSparkIT.scala | 2 +
.../org/apache/phoenix/spark/PhoenixSparkIT.scala | 81 ++++++++++++++++++++--
.../datasource/v2/writer/PhoenixDataWriter.java | 14 +++-
4 files changed, 109 insertions(+), 5 deletions(-)
diff --git a/phoenix-spark-base/src/it/resources/transactionTableSetup.sql b/phoenix-spark-base/src/it/resources/transactionTableSetup.sql
new file mode 100644
index 0000000..ff328f8
--- /dev/null
+++ b/phoenix-spark-base/src/it/resources/transactionTableSetup.sql
@@ -0,0 +1,17 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE table5 (id BIGINT NOT NULL PRIMARY KEY, table5_id BIGINT, "t5col1" VARCHAR) TRANSACTIONAL=true
\ No newline at end of file
diff --git a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
index f4b21fd..a10d303 100644
--- a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
+++ b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
@@ -98,6 +98,8 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter
// We pass in null for TenantId here since these tables will be globally visible
setupTables("globalSetup.sql", None)
+ // create transactional tables
+ setupTables("transactionTableSetup.sql", None)
// We pass in a TenantId to allow the DDL to create tenant-specific tables/views
setupTables("tenantSetup.sql", Some(TenantId))
diff --git a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index e9a3274..ebfd41c 100644
--- a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -13,17 +13,19 @@
*/
package org.apache.phoenix.spark
+import org.apache.omid.tso.client.AbortException
+
import java.sql.DriverManager
import java.util.Date
-
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
-import org.apache.phoenix.schema.types.{PVarchar, PSmallintArray, PUnsignedSmallintArray}
+import org.apache.phoenix.query.QueryServices
+import org.apache.phoenix.schema.types.{PSmallintArray, PUnsignedSmallintArray, PVarchar}
import org.apache.phoenix.spark.datasource.v2.{PhoenixDataSource, PhoenixTestingDataSource}
import org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingInputPartitionReader
import org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter
import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
import org.apache.spark.SparkException
-import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DateType, IntegerType, LongType, StringType, StructField, StructType, ShortType}
+import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DateType, IntegerType, LongType, ShortType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode}
import scala.collection.mutable
@@ -86,7 +88,78 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
results.toList shouldEqual expectedResults
}
-
+
+ test("Can persist data into transactional tables with phoenix.transactions.enabled option") {
+ var extraOptions = QueryServices.TRANSACTIONS_ENABLED + "=true";
+ val df = spark.createDataFrame(
+ Seq(
+ (1, 1, "test_child_1"),
+ (2, 1, "test_child_2"))).
+ // column names are case sensitive
+ toDF("ID", "TABLE5_ID", "t5col1")
+ df.write
+ .format("phoenix")
+ .options(Map("table" -> "TABLE5",
+ PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER -> "true",
+ PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions))
+ .mode(SaveMode.Overwrite)
+ .save()
+
+
+ // Verify results
+ val stmt = conn.createStatement()
+ val rs = stmt.executeQuery("SELECT * FROM TABLE5")
+
+ val checkResults = List((1, 1, "test_child_1"), (2, 1, "test_child_2"))
+ val results = ListBuffer[(Long, Long, String)]()
+ while (rs.next()) {
+ results.append((rs.getLong(1), rs.getLong(2), rs.getString(3)))
+ }
+ stmt.close()
+
+ results.toList shouldEqual checkResults
+ }
+
+ test("Verify transactions in streaming and spark jobs") {
+ val conn2 = DriverManager.getConnection(PhoenixSparkITHelper.getUrl)
+ conn2.setAutoCommit(false)
+ var stmt = conn.createStatement()
+ stmt.executeUpdate("UPSERT INTO TABLE5 values(1, 1, 'test_child_0')")
+
+ var extraOptions = QueryServices.TRANSACTIONS_ENABLED + "=true"
+ val df = spark.createDataFrame(
+ Seq(
+ (1, 1, "test_child_1"),
+ (2, 1, "test_child_2"))).
+ // column names are case sensitive
+ toDF("ID", "TABLE5_ID", "t5col1")
+ df.write
+ .format("phoenix")
+ .options(Map("table" -> "TABLE5",
+ PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER -> "true",
+ PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions))
+ .mode(SaveMode.Overwrite)
+ .save()
+ try {
+ conn2.commit()
+ fail("Abort exception should be thrown.")
+ } catch {
+ case e: Exception => {}
+ }
+ // Verify results
+ stmt = conn.createStatement()
+ val rs = stmt.executeQuery("SELECT * FROM TABLE5")
+
+ val checkResults = List((1, 1, "test_child_1"), (2, 1, "test_child_2"))
+ val results = ListBuffer[(Long, Long, String)]()
+ while (rs.next()) {
+ results.append((rs.getLong(1), rs.getLong(2), rs.getString(3)))
+ }
+ stmt.close()
+
+ results.toList shouldEqual checkResults
+ }
+
test("Can convert Phoenix schema") {
val phoenixSchema = List(
new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)
diff --git a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
index cfb5b1f..fd85d66 100644
--- a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
+++ b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
@@ -92,6 +92,10 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> {
}
String upsertSql = QueryUtil.constructUpsertStatement(options.getTableName(), colNames, null);
this.statement = this.conn.prepareStatement(upsertSql);
+ // Configure batch size to 0 or negative value to disable intermediate or batch commits in task.
+ // So that commit can be called only once at the end to task execution.
+ // This helps ensure consistent state of database when failures occurred and retried
+ // mainly when transactions enabled.
this.batchSize = Long.valueOf(overridingProps.getProperty(UPSERT_BATCH_SIZE,
String.valueOf(DEFAULT_UPSERT_BATCH_SIZE)));
} catch (SQLException e) {
@@ -120,7 +124,9 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> {
}
numRecords++;
statement.execute();
- if (numRecords % batchSize == 0) {
+ // Run batch wise commits only when the batch size is positive value.
+ // Otherwise commit gets called at the end of task
+ if (batchSize > 0 && numRecords % batchSize == 0) {
if (logger.isDebugEnabled()) {
logger.debug("commit called on a batch of size : " + batchSize);
}
@@ -151,5 +157,11 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> {
@Override
public void abort() {
+ try {
+ // To rollback any ongoing transactions
+ conn.rollback();
+ } catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
}
}