You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2021/11/24 01:28:19 UTC

[phoenix-connectors] branch master updated: PHOENIX-6590 Handle rollbacks in phoenix spark connector and add way to control batch wise or task wise transactions(Rajeshbabu) (#66)

This is an automated email from the ASF dual-hosted git repository.

rajeshbabu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new 609a423  PHOENIX-6590 Handle rollbacks in phoenix spark connector and add way to control batch wise or task wise transactions(Rajeshbabu) (#66)
609a423 is described below

commit 609a423646fd2c97bcb2ff7d1705aacd0ffd5ee2
Author: Rajeshbabu Chintaguntla <ch...@gmail.com>
AuthorDate: Wed Nov 24 06:58:13 2021 +0530

    PHOENIX-6590 Handle rollbacks in phoenix spark connector and add way to control batch wise or task wise transactions(Rajeshbabu) (#66)
    
    PHOENIX-6590 Handle rollbacks in phoenix spark connector and add way to control batch wise or task wise transactions(Rajeshbabu) (#66)Co-authored-by: Rajeshbabu Chintaguntla <ra...@apache.org>
---
 .../src/it/resources/transactionTableSetup.sql     | 17 +++++
 .../phoenix/spark/AbstractPhoenixSparkIT.scala     |  2 +
 .../org/apache/phoenix/spark/PhoenixSparkIT.scala  | 81 ++++++++++++++++++++--
 .../datasource/v2/writer/PhoenixDataWriter.java    | 14 +++-
 4 files changed, 109 insertions(+), 5 deletions(-)

diff --git a/phoenix-spark-base/src/it/resources/transactionTableSetup.sql b/phoenix-spark-base/src/it/resources/transactionTableSetup.sql
new file mode 100644
index 0000000..ff328f8
--- /dev/null
+++ b/phoenix-spark-base/src/it/resources/transactionTableSetup.sql
@@ -0,0 +1,17 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE table5 (id BIGINT NOT NULL PRIMARY KEY, table5_id BIGINT, "t5col1" VARCHAR) TRANSACTIONAL=true
\ No newline at end of file
diff --git a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
index f4b21fd..a10d303 100644
--- a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
+++ b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/AbstractPhoenixSparkIT.scala
@@ -98,6 +98,8 @@ class AbstractPhoenixSparkIT extends FunSuite with Matchers with BeforeAndAfter
 
     // We pass in null for TenantId here since these tables will be globally visible
     setupTables("globalSetup.sql", None)
+    // create transactional tables
+    setupTables("transactionTableSetup.sql", None)
     // We pass in a TenantId to allow the DDL to create tenant-specific tables/views
     setupTables("tenantSetup.sql", Some(TenantId))
 
diff --git a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
index e9a3274..ebfd41c 100644
--- a/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
+++ b/phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
@@ -13,17 +13,19 @@
  */
 package org.apache.phoenix.spark
 
+import org.apache.omid.tso.client.AbortException
+
 import java.sql.DriverManager
 import java.util.Date
-
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
-import org.apache.phoenix.schema.types.{PVarchar, PSmallintArray, PUnsignedSmallintArray}
+import org.apache.phoenix.query.QueryServices
+import org.apache.phoenix.schema.types.{PSmallintArray, PUnsignedSmallintArray, PVarchar}
 import org.apache.phoenix.spark.datasource.v2.{PhoenixDataSource, PhoenixTestingDataSource}
 import org.apache.phoenix.spark.datasource.v2.reader.PhoenixTestingInputPartitionReader
 import org.apache.phoenix.spark.datasource.v2.writer.PhoenixTestingDataSourceWriter
 import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
 import org.apache.spark.SparkException
-import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DateType, IntegerType, LongType, StringType, StructField, StructType, ShortType}
+import org.apache.spark.sql.types.{ArrayType, BinaryType, ByteType, DateType, IntegerType, LongType, ShortType, StringType, StructField, StructType}
 import org.apache.spark.sql.{Row, SaveMode}
 
 import scala.collection.mutable
@@ -86,7 +88,78 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
 
     results.toList shouldEqual expectedResults
   }
-  
+
+  test("Can persist data into transactional tables with phoenix.transactions.enabled option") {
+    var extraOptions = QueryServices.TRANSACTIONS_ENABLED + "=true";
+    val df = spark.createDataFrame(
+      Seq(
+        (1, 1, "test_child_1"),
+        (2, 1, "test_child_2"))).
+      // column names are case sensitive
+      toDF("ID", "TABLE5_ID", "t5col1")
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "TABLE5",
+        PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER -> "true",
+        PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions))
+      .mode(SaveMode.Overwrite)
+      .save()
+
+
+    // Verify results
+    val stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT * FROM TABLE5")
+
+    val checkResults = List((1, 1, "test_child_1"), (2, 1, "test_child_2"))
+    val results = ListBuffer[(Long, Long, String)]()
+    while (rs.next()) {
+      results.append((rs.getLong(1), rs.getLong(2), rs.getString(3)))
+    }
+    stmt.close()
+
+    results.toList shouldEqual checkResults
+  }
+
+  test("Verify transactions in streaming and spark jobs") {
+    val conn2 = DriverManager.getConnection(PhoenixSparkITHelper.getUrl)
+    conn2.setAutoCommit(false)
+    var stmt = conn.createStatement()
+    stmt.executeUpdate("UPSERT INTO TABLE5 values(1, 1, 'test_child_0')")
+
+    var extraOptions = QueryServices.TRANSACTIONS_ENABLED + "=true"
+    val df = spark.createDataFrame(
+      Seq(
+        (1, 1, "test_child_1"),
+        (2, 1, "test_child_2"))).
+      // column names are case sensitive
+      toDF("ID", "TABLE5_ID", "t5col1")
+    df.write
+      .format("phoenix")
+      .options(Map("table" -> "TABLE5",
+        PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER -> "true",
+        PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions))
+      .mode(SaveMode.Overwrite)
+      .save()
+    try {
+      conn2.commit()
+      fail("Abort exception should be thrown.")
+    } catch {
+      case e: Exception => {}
+    }
+    // Verify results
+    stmt = conn.createStatement()
+    val rs = stmt.executeQuery("SELECT * FROM TABLE5")
+
+    val checkResults = List((1, 1, "test_child_1"), (2, 1, "test_child_2"))
+    val results = ListBuffer[(Long, Long, String)]()
+    while (rs.next()) {
+      results.append((rs.getLong(1), rs.getLong(2), rs.getString(3)))
+    }
+    stmt.close()
+
+    results.toList shouldEqual checkResults
+  }
+
   test("Can convert Phoenix schema") {
     val phoenixSchema = List(
       new ColumnInfo("varcharColumn", PVarchar.INSTANCE.getSqlType)
diff --git a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
index cfb5b1f..fd85d66 100644
--- a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
+++ b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
@@ -92,6 +92,10 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> {
             }
             String upsertSql = QueryUtil.constructUpsertStatement(options.getTableName(), colNames, null);
             this.statement = this.conn.prepareStatement(upsertSql);
+            // Configure batch size to 0 or negative value to disable intermediate or batch commits in task.
+            // So that commit can be called only once at the end to task execution.
+            // This helps ensure consistent state of database when failures occurred and retried
+            // mainly when transactions enabled.
             this.batchSize = Long.valueOf(overridingProps.getProperty(UPSERT_BATCH_SIZE,
                     String.valueOf(DEFAULT_UPSERT_BATCH_SIZE)));
         } catch (SQLException e) {
@@ -120,7 +124,9 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> {
             }
             numRecords++;
             statement.execute();
-            if (numRecords % batchSize == 0) {
+            // Run batch wise commits only when the batch size is positive value.
+            // Otherwise commit gets called at the end of task
+            if (batchSize > 0 && numRecords % batchSize == 0) {
                 if (logger.isDebugEnabled()) {
                     logger.debug("commit called on a batch of size : " + batchSize);
                 }
@@ -151,5 +157,11 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> {
 
     @Override
     public void abort() {
+        try {
+            // To rollback any ongoing transactions
+            conn.rollback();
+        } catch (SQLException ex) {
+            throw new RuntimeException(ex);
+        }
     }
 }