You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2022/01/28 13:05:49 UTC

[GitHub] [phoenix-connectors] stoty commented on a change in pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

stoty commented on a change in pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r794267095



##########
File path: phoenix-spark-base/src/it/java/org/apache/phoenix/spark/OrderByIT.java
##########
@@ -130,11 +130,11 @@ public void testOrderByWithJoin() throws Exception {
                 Arrays.asList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D"));
             SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
             Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
-                    .option(DataSourceOptions.TABLE_KEY, tableName1)
+                    .option("table", tableName1)

Review comment:
       Make "table" a constant in PhoenixDataSource (e.g. PhoenixDataSource.TABLE_KEY), and use that everywhere instead of repeating the string.

##########
File path: phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
##########
@@ -48,7 +48,7 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
       .format("phoenix")
       .options(Map("table" -> "TABLE3",
         PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER -> "true"))
-      .mode(SaveMode.Overwrite)
+      .mode(SaveMode.Append)

Review comment:
       Reading https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
   it seems that SaveMode indeed applies at the table level, and not at the record level, so the previous behaviour was wrong.

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
##########
@@ -57,6 +59,14 @@ String getTenantId() {
     }
 
     Properties getOverriddenProps() {

Review comment:
       This is confusing. We set tenant, zkUrl, and scn explicitly, and set the misc overriden props in overriddenProps.
   However, this method merges all of the above, but we still it **getOverridenProps**
   
   Calling this **getEffectiveProps** or similar would be better.

##########
File path: phoenix-spark-base/pom.xml
##########
@@ -121,7 +121,7 @@
     <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.binary.version}</artifactId>
-      <version>2.2.4</version>
+      <version>3.1.0</version>

Review comment:
       Any particular reason for building with 3.1.0 as opposed to 3.0 or 3.2 ?
   

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixScan.java
##########
@@ -29,134 +29,97 @@
 import org.apache.phoenix.mapreduce.PhoenixInputSplit;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.spark.FilterExpressionCompiler;
-import org.apache.phoenix.spark.SparkSchemaUtil;
 import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
 import org.apache.spark.sql.types.StructType;
-import scala.Tuple3;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.Properties;
 
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
-public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters,
-        SupportsPushDownRequiredColumns {
+public class PhoenixScan implements Scan, Batch {
 
-    private final DataSourceOptions options;
-    private final String tableName;
+    private final StructType schema;
+    private final CaseInsensitiveStringMap options;
     private final String zkUrl;
-    private final boolean dateAsTimestamp;
     private final Properties overriddenProps;
+    private PhoenixDataSourceReadOptions phoenixDataSourceOptions;
+    private final String tableName;
+    private String currentScnValue;
+    private String tenantId;
+    private boolean splitByStats;
+    private final String whereClause;
 
-    private StructType schema;
-    private Filter[] pushedFilters = new Filter[]{};
-    // derived from pushedFilters
-    private String whereClause;
-
-    public PhoenixDataSourceReader(DataSourceOptions options) {
-        if (!options.tableName().isPresent()) {
-            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
-        }
-        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
-        }
+    PhoenixScan(StructType schema, CaseInsensitiveStringMap options, String whereClause) {
+        this.schema = schema;
         this.options = options;
-        this.tableName = options.tableName().get();
-        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get();
-        this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+        this.whereClause = whereClause;
         this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
-        setSchema();
+        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL);
+        tableName = options.get("table");
     }
 
-    /**
-     * Sets the schema using all the table columns before any column pruning has been done
-     */
-    private void setSchema() {
-        try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
-            List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
-            Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
-            schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+    private void populateOverriddenProperties(){

Review comment:
       This again conflates the overriden properties with the rest.
   
   Please try to clean up the option handling/merging a bit.

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixBatchWrite.java
##########
@@ -17,45 +17,35 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
 import org.apache.spark.sql.types.StructType;
 
+import java.util.Map;
+
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
-import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 
-public class PhoenixDataSourceWriter implements DataSourceWriter {
+public class PhoenixBatchWrite implements BatchWrite {
 
     private final PhoenixDataSourceWriteOptions options;
+    private final LogicalWriteInfo writeInfo;
 
-    public PhoenixDataSourceWriter(SaveMode mode, StructType schema, DataSourceOptions options) {
-        if (!mode.equals(SaveMode.Overwrite)) {

Review comment:
       Given the previous buggy operation where we demanded SaveMode.Overwrite, I think that we should
   check the SaveMode here, and log a warning if it is not **Append**.
   We should probably throw an exception for any mode other than Append and Overwrite.
   
   For the warning someghing like:
   "Only SaveMode.Append is supported. Savemode.Overwrite is accepted for backwards compatibility, but
   it wil behave as if Savemode.Append was specified."

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
##########
@@ -81,6 +82,14 @@ boolean skipNormalizingIdentifier() {
     }
 
     Properties getOverriddenProps() {
+        String scn = getScn();

Review comment:
       Same problem with the method name as in PhoenixDataSourceReaderOptions.

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixBatchWrite.java
##########
@@ -17,45 +17,35 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
 import org.apache.spark.sql.types.StructType;
 
+import java.util.Map;
+
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
-import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 
-public class PhoenixDataSourceWriter implements DataSourceWriter {
+public class PhoenixBatchWrite implements BatchWrite {
 
     private final PhoenixDataSourceWriteOptions options;
+    private final LogicalWriteInfo writeInfo;
 
-    public PhoenixDataSourceWriter(SaveMode mode, StructType schema, DataSourceOptions options) {
-        if (!mode.equals(SaveMode.Overwrite)) {
-            throw new RuntimeException("SaveMode other than SaveMode.OverWrite is not supported");
-        }
-        if (!options.tableName().isPresent()) {
-            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
-        }
-        if (!options.get(ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + ZOOKEEPER_URL + " defined");
-        }
-        this.options = createPhoenixDataSourceWriteOptions(options, schema);
-    }
-
-    @Override
-    public DataWriterFactory<InternalRow> createWriterFactory() {
-        return new PhoenixDataWriterFactory(options);
+    PhoenixBatchWrite(LogicalWriteInfo writeInfo, Map<String,String> options) {
+        this.writeInfo = writeInfo;
+        this.options = createPhoenixDataSourceWriteOptions(options, writeInfo.schema());
     }
 
     @Override
-    public boolean useCommitCoordinator() {

Review comment:
       I'm not sure what this does.

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixScan.java
##########
@@ -29,134 +29,97 @@
 import org.apache.phoenix.mapreduce.PhoenixInputSplit;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.spark.FilterExpressionCompiler;
-import org.apache.phoenix.spark.SparkSchemaUtil;
 import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
 import org.apache.spark.sql.types.StructType;
-import scala.Tuple3;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.Properties;
 
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
-public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters,
-        SupportsPushDownRequiredColumns {
+public class PhoenixScan implements Scan, Batch {
 
-    private final DataSourceOptions options;
-    private final String tableName;
+    private final StructType schema;
+    private final CaseInsensitiveStringMap options;
     private final String zkUrl;
-    private final boolean dateAsTimestamp;
     private final Properties overriddenProps;
+    private PhoenixDataSourceReadOptions phoenixDataSourceOptions;
+    private final String tableName;
+    private String currentScnValue;
+    private String tenantId;
+    private boolean splitByStats;
+    private final String whereClause;
 
-    private StructType schema;
-    private Filter[] pushedFilters = new Filter[]{};
-    // derived from pushedFilters
-    private String whereClause;
-
-    public PhoenixDataSourceReader(DataSourceOptions options) {
-        if (!options.tableName().isPresent()) {
-            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
-        }
-        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
-        }
+    PhoenixScan(StructType schema, CaseInsensitiveStringMap options, String whereClause) {
+        this.schema = schema;
         this.options = options;
-        this.tableName = options.tableName().get();
-        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get();
-        this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+        this.whereClause = whereClause;
         this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
-        setSchema();
+        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL);
+        tableName = options.get("table");
     }
 
-    /**
-     * Sets the schema using all the table columns before any column pruning has been done
-     */
-    private void setSchema() {
-        try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
-            List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
-            Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
-            schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+    private void populateOverriddenProperties(){
+        currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+        tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+        // Generate splits based off statistics, or just region splits?
+        splitByStats = options.getBoolean(
+                PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
+        if(currentScnValue != null) {
+            overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
         }
-        catch (SQLException e) {
-            throw new RuntimeException(e);
+        if (tenantId != null){
+            overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         }
     }
 
-    PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions readOptions,
-            PhoenixInputSplit inputSplit) {
-        return new PhoenixInputPartition(readOptions, schema, inputSplit);
-    }
-
     @Override
     public StructType readSchema() {
         return schema;
     }
 
     @Override
-    public Filter[] pushFilters(Filter[] filters) {
-        Tuple3<String, Filter[], Filter[]> tuple3 = new FilterExpressionCompiler().pushFilters(filters);
-        whereClause = tuple3._1();
-        pushedFilters = tuple3._3();
-        return tuple3._2();
+    public String description() {
+        return this.getClass().toString();
     }
 
     @Override
-    public List<InputPartition<InternalRow>> planInputPartitions() {
-        Optional<String> currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
-        Optional<String> tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
-        // Generate splits based off statistics, or just region splits?
-        boolean splitByStats = options.getBoolean(
-                PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
-        if(currentScnValue.isPresent()) {
-            overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get());
-        }
-        if (tenantId.isPresent()){
-            overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get());
-        }
+    public Batch toBatch() {
+        return this;
+    }
+
+    @Override
+    public InputPartition[] planInputPartitions() {
+        populateOverriddenProperties();
         try (Connection conn = DriverManager.getConnection(
                 JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
             List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, new ArrayList<>(
-                Arrays.asList(schema.names())));
+                    Arrays.asList(schema.names())));
             final Statement statement = conn.createStatement();
             final String selectStatement = QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause);
             if (selectStatement == null){
                 throw new NullPointerException();
             }
-
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
             // Optimize the query plan so that we potentially use secondary indexes
             final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
-            final Scan scan = queryPlan.getContext().getScan();
-
-            // setting the snapshot configuration
-            Optional<String> snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);

Review comment:
       We are losing functionality here.
   Is there any reason you haven't added code for this option in PhoenixDataSourceReadOptions ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org