You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2022/01/28 00:42:17 UTC

[GitHub] [phoenix-connectors] ashwinb1998 opened a new pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

ashwinb1998 opened a new pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69


   With Spark-3, the DatasourceV2 API has had major changes, where a new TableProvider Interface has been introduced. These new changes bring in more control to the data source developer and better integration with spark-optimizer.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] stoty commented on a change in pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
stoty commented on a change in pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r797649170



##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
##########
@@ -17,42 +17,76 @@
  */
 package org.apache.phoenix.spark.datasource.v2;
 
-import java.util.Optional;
-import java.util.Properties;
-
-import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader;
-import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.spark.sql.SaveMode;
+import org.apache.phoenix.spark.SparkSchemaUtil;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.sources.DataSourceRegister;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.WriteSupport;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
 /**
  * Implements the DataSourceV2 api to read and write from Phoenix tables
  */
-public class PhoenixDataSource  implements DataSourceV2,  ReadSupport, WriteSupport, DataSourceRegister {
+public class PhoenixDataSource implements TableProvider, DataSourceRegister {
 
     private static final Logger logger = LoggerFactory.getLogger(PhoenixDataSource.class);
+    public static final String TABLE_KEY = "table";
     public static final String SKIP_NORMALIZING_IDENTIFIER = "skipNormalizingIdentifier";
     public static final String ZOOKEEPER_URL = "zkUrl";
     public static final String PHOENIX_CONFIGS = "phoenixconfigs";
+    protected StructType schema;
+    private CaseInsensitiveStringMap options;
 
     @Override
-    public DataSourceReader createReader(DataSourceOptions options) {
-        return new PhoenixDataSourceReader(options);
+    public StructType inferSchema(CaseInsensitiveStringMap options){
+        if (options.get("table") == null) {

Review comment:
       use the new TABLE_KEY constant everywhere

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixScan.java
##########
@@ -29,134 +29,104 @@
 import org.apache.phoenix.mapreduce.PhoenixInputSplit;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.spark.FilterExpressionCompiler;
-import org.apache.phoenix.spark.SparkSchemaUtil;
 import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
 import org.apache.spark.sql.types.StructType;
-import scala.Tuple3;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.Properties;
 
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
-public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters,
-        SupportsPushDownRequiredColumns {
+public class PhoenixScan implements Scan, Batch {
 
-    private final DataSourceOptions options;
-    private final String tableName;
+    private final StructType schema;
+    private final CaseInsensitiveStringMap options;
     private final String zkUrl;
-    private final boolean dateAsTimestamp;
     private final Properties overriddenProps;
+    private PhoenixDataSourceReadOptions phoenixDataSourceOptions;
+    private final String tableName;
+    private String currentScnValue;
+    private String tenantId;
+    private boolean splitByStats;
+    private final String whereClause;
 
-    private StructType schema;
-    private Filter[] pushedFilters = new Filter[]{};
-    // derived from pushedFilters
-    private String whereClause;
-
-    public PhoenixDataSourceReader(DataSourceOptions options) {
-        if (!options.tableName().isPresent()) {
-            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
-        }
-        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
-        }
+    PhoenixScan(StructType schema, CaseInsensitiveStringMap options, String whereClause) {
+        this.schema = schema;
         this.options = options;
-        this.tableName = options.tableName().get();
-        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get();
-        this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+        this.whereClause = whereClause;
         this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
-        setSchema();
+        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL);
+        tableName = options.get("table");
     }
 
-    /**
-     * Sets the schema using all the table columns before any column pruning has been done
-     */
-    private void setSchema() {
-        try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
-            List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
-            Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
-            schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+    private void populateOverriddenProperties(){
+        currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+        tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+        // Generate splits based off statistics, or just region splits?
+        splitByStats = options.getBoolean(
+                PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
+        if(currentScnValue != null) {
+            overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
         }
-        catch (SQLException e) {
-            throw new RuntimeException(e);
+        if (tenantId != null){
+            overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         }
     }
 
-    PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions readOptions,
-            PhoenixInputSplit inputSplit) {
-        return new PhoenixInputPartition(readOptions, schema, inputSplit);
-    }
-
     @Override
     public StructType readSchema() {
         return schema;
     }
 
     @Override
-    public Filter[] pushFilters(Filter[] filters) {
-        Tuple3<String, Filter[], Filter[]> tuple3 = new FilterExpressionCompiler().pushFilters(filters);
-        whereClause = tuple3._1();
-        pushedFilters = tuple3._3();
-        return tuple3._2();
+    public String description() {
+        return this.getClass().toString();
     }
 
     @Override
-    public List<InputPartition<InternalRow>> planInputPartitions() {
-        Optional<String> currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
-        Optional<String> tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
-        // Generate splits based off statistics, or just region splits?
-        boolean splitByStats = options.getBoolean(
-                PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
-        if(currentScnValue.isPresent()) {
-            overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get());
-        }
-        if (tenantId.isPresent()){
-            overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get());
-        }
+    public Batch toBatch() {
+        return this;
+    }
+
+    @Override
+    public InputPartition[] planInputPartitions() {
+        populateOverriddenProperties();
         try (Connection conn = DriverManager.getConnection(
                 JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
             List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, new ArrayList<>(
-                Arrays.asList(schema.names())));
+                    Arrays.asList(schema.names())));
             final Statement statement = conn.createStatement();
             final String selectStatement = QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause);
             if (selectStatement == null){
                 throw new NullPointerException();
             }
-
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
             // Optimize the query plan so that we potentially use secondary indexes
             final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
-            final Scan scan = queryPlan.getContext().getScan();
+            final org.apache.hadoop.hbase.client.Scan scan = queryPlan.getContext().getScan();
 
             // setting the snapshot configuration
-            Optional<String> snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
-            if (snapshotName.isPresent())
-                PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().
-                        getQueryServices().getConfiguration(), snapshotName.get());
+            String snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+
+            if(snapshotName != null){
+                PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().getQueryServices().getConfiguration(), snapshotName);

Review comment:
       Indentation seems to be off.

##########
File path: phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
##########
@@ -48,7 +48,7 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
       .format("phoenix")
       .options(Map("table" -> "TABLE3",
         PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER -> "true"))
-      .mode(SaveMode.Overwrite)
+      .mode(SaveMode.Append)

Review comment:
       You mentioned that SaveMode is no longer used internally.
   Does the SaveMode we set here gets simply ignored by Spark ?

##########
File path: phoenix-spark-base/pom.xml
##########
@@ -121,7 +121,7 @@
     <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.binary.version}</artifactId>
-      <version>2.2.4</version>
+      <version>3.1.0</version>

Review comment:
       I'm not sure how stable this Spark API is.
   If it is indeed stable for all 3.x, then we'd better use the latest version, hoping that it's faster and more stable for our tests.

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixBatchWrite.java
##########
@@ -17,45 +17,35 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
 import org.apache.spark.sql.types.StructType;
 
+import java.util.Map;
+
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
-import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 
-public class PhoenixDataSourceWriter implements DataSourceWriter {
+public class PhoenixBatchWrite implements BatchWrite {
 
     private final PhoenixDataSourceWriteOptions options;
+    private final LogicalWriteInfo writeInfo;
 
-    public PhoenixDataSourceWriter(SaveMode mode, StructType schema, DataSourceOptions options) {
-        if (!mode.equals(SaveMode.Overwrite)) {

Review comment:
       No SaveMode, no cry.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 commented on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 commented on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1031147332


   > Changes need to be pushed by a committer. I'm going to push the change when it's done. I'm going to ask a review for it from you first.
   > 
   > I'll leave you as the author of the patch, and add myself as a co-author if that's all right with you.
   
   Oh okay, sounds great. Its all good with me 🙂


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] stoty commented on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
stoty commented on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1070522824


   Can you close this PR @ashwinb1998 , so that we don't have two open PRs for this ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] stoty commented on a change in pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
stoty commented on a change in pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r799314827



##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
##########
@@ -17,42 +17,76 @@
  */
 package org.apache.phoenix.spark.datasource.v2;
 
-import java.util.Optional;
-import java.util.Properties;
-
-import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader;
-import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.spark.sql.SaveMode;
+import org.apache.phoenix.spark.SparkSchemaUtil;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.sources.DataSourceRegister;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.WriteSupport;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
 /**
  * Implements the DataSourceV2 api to read and write from Phoenix tables
  */
-public class PhoenixDataSource  implements DataSourceV2,  ReadSupport, WriteSupport, DataSourceRegister {
+public class PhoenixDataSource implements TableProvider, DataSourceRegister {
 
     private static final Logger logger = LoggerFactory.getLogger(PhoenixDataSource.class);
+    public static final String TABLE_KEY = "table";
     public static final String SKIP_NORMALIZING_IDENTIFIER = "skipNormalizingIdentifier";
     public static final String ZOOKEEPER_URL = "zkUrl";
     public static final String PHOENIX_CONFIGS = "phoenixconfigs";
+    protected StructType schema;
+    private CaseInsensitiveStringMap options;
 
     @Override
-    public DataSourceReader createReader(DataSourceOptions options) {
-        return new PhoenixDataSourceReader(options);
+    public StructType inferSchema(CaseInsensitiveStringMap options){
+        if (options.get("table") == null) {

Review comment:
       I can still find a lot of "table" strings in the code.
   It should only have instance, where the TABLE_KEY constant is defined.
   
   Please do a text search, and replace it for every option name reference.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 commented on a change in pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 commented on a change in pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r799089630



##########
File path: phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
##########
@@ -48,7 +48,7 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
       .format("phoenix")
       .options(Map("table" -> "TABLE3",
         PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER -> "true"))
-      .mode(SaveMode.Overwrite)
+      .mode(SaveMode.Append)

Review comment:
       I believe there is an internal file source v2 API that uses Savemode for writes, which is why we need to specify `mode(SaveMode.Append)`. SaveMode is removed only from DataSourceV2 API, which means we won't be able to check from the parameter if the given condition is Append or Overwrite.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 commented on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 commented on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1030318579


   Please let me know if there are more changes, I will try to finish as quick as I can. 
   
   Regarding the packaging, I believe I'm not quite experienced with maven, especially since this change would require a lot of modifications. It would be best if you could take over for the maven part. But definitely let me know if I can help in any way. 
   
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] stoty commented on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
stoty commented on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1030381676


   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime | Comment |
   |:----:|----------:|--------:|:--------|
   | +0 :ok: |  reexec  |   0m 57s |  Docker mode activated.  |
   ||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  1s |  No case conflicting files found.  |
   | +1 :green_heart: |  hbaseanti  |   0m  0s |  Patch does not have any anti-patterns.  |
   | +1 :green_heart: |  @author  |   0m  0s |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  The patch appears to include 18 new or modified test files.  |
   ||| _ master Compile Tests _ |
   | +0 :ok: |  mvndep  |  11m  8s |  Maven dependency ordering for branch  |
   | +1 :green_heart: |  mvninstall  |  16m 52s |  master passed  |
   | +1 :green_heart: |  compile  |   2m 34s |  master passed  |
   | +1 :green_heart: |  checkstyle  |   0m 44s |  master passed  |
   | +1 :green_heart: |  javadoc  |   1m 15s |  master passed  |
   | -1 :x: |  scaladoc  |   0m 10s |  root in master failed.  |
   | +0 :ok: |  spotbugs  |   2m 10s |  phoenix-spark-base in master has 40 extant spotbugs warnings.  |
   | +0 :ok: |  spotbugs  |   3m 43s |  root in master has 116 extant spotbugs warnings.  |
   ||| _ Patch Compile Tests _ |
   | +0 :ok: |  mvndep  |   0m 37s |  Maven dependency ordering for patch  |
   | -1 :x: |  mvninstall  |  10m  5s |  root in the patch failed.  |
   | -1 :x: |  compile  |   1m 33s |  root in the patch failed.  |
   | -1 :x: |  javac  |   1m 33s |  root in the patch failed.  |
   | -1 :x: |  scalac  |   1m 33s |  root in the patch failed.  |
   | -1 :x: |  checkstyle  |   0m 49s |  root: The patch generated 263 new + 743 unchanged - 169 fixed = 1006 total (was 912)  |
   | +1 :green_heart: |  whitespace  |   0m  0s |  The patch has no whitespace issues.  |
   | +1 :green_heart: |  xml  |   0m  2s |  The patch has no ill-formed XML file.  |
   | -1 :x: |  javadoc  |   0m 21s |  phoenix-spark-base generated 180 new + 4 unchanged - 6 fixed = 184 total (was 10)  |
   | -1 :x: |  javadoc  |   1m  1s |  root generated 180 new + 14 unchanged - 6 fixed = 194 total (was 20)  |
   | -1 :x: |  scaladoc  |   0m 39s |  phoenix-spark-base in the patch failed.  |
   | -1 :x: |  scaladoc  |   0m  9s |  root in the patch failed.  |
   | -1 :x: |  spotbugs  |   0m 40s |  phoenix-spark-base in the patch failed.  |
   | -1 :x: |  spotbugs  |   2m 47s |  root in the patch failed.  |
   ||| _ Other Tests _ |
   | -1 :x: |  unit  |  45m 27s |  root in the patch failed.  |
   | -1 :x: |  asflicense  |   0m 18s |  The patch generated 5 ASF License warnings.  |
   |  |   | 106m  1s |   |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/Dockerfile |
   | GITHUB PR | https://github.com/apache/phoenix-connectors/pull/69 |
   | Optional Tests | dupname asflicense javac javadoc unit xml compile spotbugs hbaseanti checkstyle scalac scaladoc |
   | uname | Linux ac994ead4a13 4.15.0-112-generic #113-Ubuntu SMP Thu Jul 9 23:41:39 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | dev/phoenix-connectors-personality.sh |
   | git revision | master / 72496e5 |
   | Default Java | Private Build-1.8.0_242-8u242-b08-0ubuntu3~16.04-b08 |
   | scaladoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/branch-scaladoc-root.txt |
   | mvninstall | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/patch-mvninstall-root.txt |
   | compile | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/patch-compile-root.txt |
   | javac | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/patch-compile-root.txt |
   | scalac | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/patch-compile-root.txt |
   | checkstyle | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/diff-checkstyle-root.txt |
   | javadoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/diff-javadoc-javadoc-phoenix-spark-base.txt |
   | javadoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/diff-javadoc-javadoc-root.txt |
   | scaladoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/patch-scaladoc-phoenix-spark-base.txt |
   | scaladoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/patch-scaladoc-root.txt |
   | spotbugs | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/patch-spotbugs-phoenix-spark-base.txt |
   | spotbugs | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/patch-spotbugs-root.txt |
   | unit | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/patch-unit-root.txt |
   |  Test Results | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/testReport/ |
   | asflicense | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/artifact/yetus-general-check/output/patch-asflicense-problems.txt |
   | Max. process+thread count | 1904 (vs. ulimit of 30000) |
   | modules | C: phoenix-spark-base . U: . |
   | Console output | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/4/console |
   | versions | git=2.7.4 maven=3.3.9 spotbugs=4.1.3 |
   | Powered by | Apache Yetus 0.12.0 https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 commented on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 commented on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1031136422


   > 
   
   Awesome sounds good. 
   Just a small question, how would I push my changes once you finish the packaging part? 
   since you were going to add it to a different package like org.apache.phoenix.spark.sql.connector


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 commented on a change in pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 commented on a change in pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r797456009



##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
##########
@@ -57,6 +59,14 @@ String getTenantId() {
     }
 
     Properties getOverriddenProps() {

Review comment:
       I have modified the way this was implemented, I went back to how it is currently.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 commented on a change in pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 commented on a change in pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r797456215



##########
File path: phoenix-spark-base/pom.xml
##########
@@ -121,7 +121,7 @@
     <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.binary.version}</artifactId>
-      <version>2.2.4</version>
+      <version>3.1.0</version>

Review comment:
       No specific reason, I can change this to latest version if you'd think that would be better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] stoty commented on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
stoty commented on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1027933565


   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime | Comment |
   |:----:|----------:|--------:|:--------|
   | +0 :ok: |  reexec  |   6m 25s |  Docker mode activated.  |
   ||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  1s |  No case conflicting files found.  |
   | +1 :green_heart: |  hbaseanti  |   0m  0s |  Patch does not have any anti-patterns.  |
   | +1 :green_heart: |  @author  |   0m  0s |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  The patch appears to include 18 new or modified test files.  |
   ||| _ master Compile Tests _ |
   | +0 :ok: |  mvndep  |  11m 28s |  Maven dependency ordering for branch  |
   | +1 :green_heart: |  mvninstall  |  16m 55s |  master passed  |
   | +1 :green_heart: |  compile  |   2m 38s |  master passed  |
   | +1 :green_heart: |  checkstyle  |   0m 44s |  master passed  |
   | +1 :green_heart: |  javadoc  |   1m 13s |  master passed  |
   | -1 :x: |  scaladoc  |   0m  9s |  root in master failed.  |
   | +0 :ok: |  spotbugs  |   2m 12s |  phoenix-spark-base in master has 40 extant spotbugs warnings.  |
   | +0 :ok: |  spotbugs  |   3m 43s |  root in master has 116 extant spotbugs warnings.  |
   ||| _ Patch Compile Tests _ |
   | +0 :ok: |  mvndep  |   0m 28s |  Maven dependency ordering for patch  |
   | -1 :x: |  mvninstall  |   9m 51s |  root in the patch failed.  |
   | -1 :x: |  compile  |   1m 37s |  root in the patch failed.  |
   | -1 :x: |  javac  |   1m 37s |  root in the patch failed.  |
   | -1 :x: |  scalac  |   1m 37s |  root in the patch failed.  |
   | -1 :x: |  checkstyle  |   0m 50s |  root: The patch generated 263 new + 743 unchanged - 169 fixed = 1006 total (was 912)  |
   | +1 :green_heart: |  whitespace  |   0m  1s |  The patch has no whitespace issues.  |
   | +1 :green_heart: |  xml  |   0m  2s |  The patch has no ill-formed XML file.  |
   | -1 :x: |  javadoc  |   0m 21s |  phoenix-spark-base generated 180 new + 4 unchanged - 6 fixed = 184 total (was 10)  |
   | -1 :x: |  javadoc  |   1m  1s |  root generated 180 new + 14 unchanged - 6 fixed = 194 total (was 20)  |
   | -1 :x: |  scaladoc  |   0m 38s |  phoenix-spark-base in the patch failed.  |
   | -1 :x: |  scaladoc  |   0m  8s |  root in the patch failed.  |
   | -1 :x: |  spotbugs  |   0m 40s |  phoenix-spark-base in the patch failed.  |
   | -1 :x: |  spotbugs  |   2m 48s |  root in the patch failed.  |
   ||| _ Other Tests _ |
   | -1 :x: |  unit  |  69m 55s |  root in the patch failed.  |
   | -1 :x: |  asflicense  |   0m 19s |  The patch generated 5 ASF License warnings.  |
   |  |   | 136m  6s |   |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/Dockerfile |
   | GITHUB PR | https://github.com/apache/phoenix-connectors/pull/69 |
   | Optional Tests | dupname asflicense javac javadoc unit xml compile spotbugs hbaseanti checkstyle scalac scaladoc |
   | uname | Linux 339f7c164444 4.15.0-112-generic #113-Ubuntu SMP Thu Jul 9 23:41:39 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | dev/phoenix-connectors-personality.sh |
   | git revision | master / 72496e5 |
   | Default Java | Private Build-1.8.0_242-8u242-b08-0ubuntu3~16.04-b08 |
   | scaladoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/branch-scaladoc-root.txt |
   | mvninstall | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/patch-mvninstall-root.txt |
   | compile | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/patch-compile-root.txt |
   | javac | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/patch-compile-root.txt |
   | scalac | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/patch-compile-root.txt |
   | checkstyle | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/diff-checkstyle-root.txt |
   | javadoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/diff-javadoc-javadoc-phoenix-spark-base.txt |
   | javadoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/diff-javadoc-javadoc-root.txt |
   | scaladoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/patch-scaladoc-phoenix-spark-base.txt |
   | scaladoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/patch-scaladoc-root.txt |
   | spotbugs | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/patch-spotbugs-phoenix-spark-base.txt |
   | spotbugs | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/patch-spotbugs-root.txt |
   | unit | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/patch-unit-root.txt |
   |  Test Results | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/testReport/ |
   | asflicense | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/artifact/yetus-general-check/output/patch-asflicense-problems.txt |
   | Max. process+thread count | 1916 (vs. ulimit of 30000) |
   | modules | C: phoenix-spark-base . U: . |
   | Console output | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/2/console |
   | versions | git=2.7.4 maven=3.3.9 spotbugs=4.1.3 |
   | Powered by | Apache Yetus 0.12.0 https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 commented on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 commented on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1072085140


   Sure @stoty, will do


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] stoty commented on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
stoty commented on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1031131426


   Thank you @ashwinb1998 , I'll finish the build system integration (and I'm probably gonna change the package as well.)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 commented on a change in pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 commented on a change in pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r797455273



##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixBatchWrite.java
##########
@@ -17,45 +17,35 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
 import org.apache.spark.sql.types.StructType;
 
+import java.util.Map;
+
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
-import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 
-public class PhoenixDataSourceWriter implements DataSourceWriter {
+public class PhoenixBatchWrite implements BatchWrite {
 
     private final PhoenixDataSourceWriteOptions options;
+    private final LogicalWriteInfo writeInfo;
 
-    public PhoenixDataSourceWriter(SaveMode mode, StructType schema, DataSourceOptions options) {
-        if (!mode.equals(SaveMode.Overwrite)) {
-            throw new RuntimeException("SaveMode other than SaveMode.OverWrite is not supported");
-        }
-        if (!options.tableName().isPresent()) {
-            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
-        }
-        if (!options.get(ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + ZOOKEEPER_URL + " defined");
-        }
-        this.options = createPhoenixDataSourceWriteOptions(options, schema);
-    }
-
-    @Override
-    public DataWriterFactory<InternalRow> createWriterFactory() {
-        return new PhoenixDataWriterFactory(options);
+    PhoenixBatchWrite(LogicalWriteInfo writeInfo, Map<String,String> options) {
+        this.writeInfo = writeInfo;
+        this.options = createPhoenixDataSourceWriteOptions(options, writeInfo.schema());
     }
 
     @Override
-    public boolean useCommitCoordinator() {

Review comment:
       It basically returns whether Spark should use the commit coordinator to ensure that at most one task for each partition commits. It has now been removed with DSV2 in spark3. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 closed pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 closed pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] stoty commented on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
stoty commented on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1031145879


   Changes need to be pushed by a committer.
   I'm going to push the change when it's done.
   I'm going to ask a review for it from you first.
   
   I'll leave you as the author of the patch, and add myself as a co-author if that's all right with you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 edited a comment on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 edited a comment on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1031136422


   > 
   
   Awesome sounds good. 
   Just a small question, how would I push my changes once you finish the packaging/ build system integration part? 
   since you were going to add it to a different package like org.apache.phoenix.spark.sql.connector


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 commented on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 commented on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1024761724


   Thank you so much for you comments. I will work on these changes that you have mentioned.
   
   Regarding the packaging, I was taking a look into the Apache Hudi package and they have separate [modules](https://github.com/apache/hudi/tree/master/hudi-spark-datasource/) for spark-2 and spark-3. The default spark-version is set to spark-2, but if someone wants to build with spark-3 they can do that as well. 
   
   Is that something which can be done here as well? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] stoty commented on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
stoty commented on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1023831332


   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime | Comment |
   |:----:|----------:|--------:|:--------|
   | +0 :ok: |  reexec  |   5m 30s |  Docker mode activated.  |
   ||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  No case conflicting files found.  |
   | +1 :green_heart: |  hbaseanti  |   0m  0s |  Patch does not have any anti-patterns.  |
   | +1 :green_heart: |  @author  |   0m  0s |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  The patch appears to include 18 new or modified test files.  |
   ||| _ master Compile Tests _ |
   | +0 :ok: |  mvndep  |  11m 50s |  Maven dependency ordering for branch  |
   | +1 :green_heart: |  mvninstall  |  16m 37s |  master passed  |
   | +1 :green_heart: |  compile  |   2m 35s |  master passed  |
   | +1 :green_heart: |  checkstyle  |   0m 43s |  master passed  |
   | +1 :green_heart: |  javadoc  |   1m 13s |  master passed  |
   | -1 :x: |  scaladoc  |   0m 10s |  root in master failed.  |
   | +0 :ok: |  spotbugs  |   2m 12s |  phoenix-spark-base in master has 40 extant spotbugs warnings.  |
   | +0 :ok: |  spotbugs  |   3m 44s |  root in master has 116 extant spotbugs warnings.  |
   ||| _ Patch Compile Tests _ |
   | +0 :ok: |  mvndep  |   0m 28s |  Maven dependency ordering for patch  |
   | -1 :x: |  mvninstall  |   9m 55s |  root in the patch failed.  |
   | -1 :x: |  compile  |   1m 37s |  root in the patch failed.  |
   | -1 :x: |  javac  |   1m 37s |  root in the patch failed.  |
   | -1 :x: |  scalac  |   1m 37s |  root in the patch failed.  |
   | -1 :x: |  checkstyle  |   0m 50s |  root: The patch generated 262 new + 743 unchanged - 169 fixed = 1005 total (was 912)  |
   | +1 :green_heart: |  whitespace  |   0m  0s |  The patch has no whitespace issues.  |
   | +1 :green_heart: |  xml  |   0m  3s |  The patch has no ill-formed XML file.  |
   | -1 :x: |  javadoc  |   0m 20s |  phoenix-spark-base generated 180 new + 4 unchanged - 6 fixed = 184 total (was 10)  |
   | -1 :x: |  javadoc  |   1m  1s |  root generated 180 new + 14 unchanged - 6 fixed = 194 total (was 20)  |
   | -1 :x: |  scaladoc  |   0m 39s |  phoenix-spark-base in the patch failed.  |
   | -1 :x: |  scaladoc  |   0m  9s |  root in the patch failed.  |
   | -1 :x: |  spotbugs  |   0m 40s |  phoenix-spark-base in the patch failed.  |
   | -1 :x: |  spotbugs  |   2m 49s |  root in the patch failed.  |
   ||| _ Other Tests _ |
   | -1 :x: |  unit  |  50m 44s |  root in the patch failed.  |
   | -1 :x: |  asflicense  |   0m 18s |  The patch generated 5 ASF License warnings.  |
   |  |   | 116m  7s |   |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/Dockerfile |
   | GITHUB PR | https://github.com/apache/phoenix-connectors/pull/69 |
   | Optional Tests | dupname asflicense javac javadoc unit xml compile spotbugs hbaseanti checkstyle scalac scaladoc |
   | uname | Linux b0afb2124731 4.15.0-156-generic #163-Ubuntu SMP Thu Aug 19 23:31:58 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | dev/phoenix-connectors-personality.sh |
   | git revision | master / 3bf822f |
   | Default Java | Private Build-1.8.0_242-8u242-b08-0ubuntu3~16.04-b08 |
   | scaladoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/branch-scaladoc-root.txt |
   | mvninstall | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/patch-mvninstall-root.txt |
   | compile | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/patch-compile-root.txt |
   | javac | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/patch-compile-root.txt |
   | scalac | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/patch-compile-root.txt |
   | checkstyle | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/diff-checkstyle-root.txt |
   | javadoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/diff-javadoc-javadoc-phoenix-spark-base.txt |
   | javadoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/diff-javadoc-javadoc-root.txt |
   | scaladoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/patch-scaladoc-phoenix-spark-base.txt |
   | scaladoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/patch-scaladoc-root.txt |
   | spotbugs | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/patch-spotbugs-phoenix-spark-base.txt |
   | spotbugs | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/patch-spotbugs-root.txt |
   | unit | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/patch-unit-root.txt |
   |  Test Results | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/testReport/ |
   | asflicense | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/artifact/yetus-general-check/output/patch-asflicense-problems.txt |
   | Max. process+thread count | 1909 (vs. ulimit of 30000) |
   | modules | C: phoenix-spark-base . U: . |
   | Console output | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/1/console |
   | versions | git=2.7.4 maven=3.3.9 spotbugs=4.1.3 |
   | Powered by | Apache Yetus 0.12.0 https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] stoty commented on a change in pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
stoty commented on a change in pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r794267095



##########
File path: phoenix-spark-base/src/it/java/org/apache/phoenix/spark/OrderByIT.java
##########
@@ -130,11 +130,11 @@ public void testOrderByWithJoin() throws Exception {
                 Arrays.asList("A_STRING", "CF1.A", "CF1.B", "COL1", "CF2.C", "CF2.D"));
             SQLContext sqlContext = SparkUtil.getSparkSession().sqlContext();
             Dataset phoenixDataSet = SparkUtil.getSparkSession().read().format("phoenix")
-                    .option(DataSourceOptions.TABLE_KEY, tableName1)
+                    .option("table", tableName1)

Review comment:
       Make "table" a constant in PhoenixDataSource (e.g. PhoenixDataSource.TABLE_KEY), and use that everywhere instead of repeating the string.

##########
File path: phoenix-spark-base/src/it/scala/org/apache/phoenix/spark/PhoenixSparkIT.scala
##########
@@ -48,7 +48,7 @@ class PhoenixSparkIT extends AbstractPhoenixSparkIT {
       .format("phoenix")
       .options(Map("table" -> "TABLE3",
         PhoenixDataSource.ZOOKEEPER_URL -> quorumAddress, PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER -> "true"))
-      .mode(SaveMode.Overwrite)
+      .mode(SaveMode.Append)

Review comment:
       Reading https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
   it seems that SaveMode indeed applies at the table level, and not at the record level, so the previous behaviour was wrong.

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
##########
@@ -57,6 +59,14 @@ String getTenantId() {
     }
 
     Properties getOverriddenProps() {

Review comment:
       This is confusing. We set tenant, zkUrl, and scn explicitly, and set the misc overriden props in overriddenProps.
   However, this method merges all of the above, but we still it **getOverridenProps**
   
   Calling this **getEffectiveProps** or similar would be better.

##########
File path: phoenix-spark-base/pom.xml
##########
@@ -121,7 +121,7 @@
     <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.binary.version}</artifactId>
-      <version>2.2.4</version>
+      <version>3.1.0</version>

Review comment:
       Any particular reason for building with 3.1.0 as opposed to 3.0 or 3.2 ?
   

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixScan.java
##########
@@ -29,134 +29,97 @@
 import org.apache.phoenix.mapreduce.PhoenixInputSplit;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.spark.FilterExpressionCompiler;
-import org.apache.phoenix.spark.SparkSchemaUtil;
 import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
 import org.apache.spark.sql.types.StructType;
-import scala.Tuple3;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.Properties;
 
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
-public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters,
-        SupportsPushDownRequiredColumns {
+public class PhoenixScan implements Scan, Batch {
 
-    private final DataSourceOptions options;
-    private final String tableName;
+    private final StructType schema;
+    private final CaseInsensitiveStringMap options;
     private final String zkUrl;
-    private final boolean dateAsTimestamp;
     private final Properties overriddenProps;
+    private PhoenixDataSourceReadOptions phoenixDataSourceOptions;
+    private final String tableName;
+    private String currentScnValue;
+    private String tenantId;
+    private boolean splitByStats;
+    private final String whereClause;
 
-    private StructType schema;
-    private Filter[] pushedFilters = new Filter[]{};
-    // derived from pushedFilters
-    private String whereClause;
-
-    public PhoenixDataSourceReader(DataSourceOptions options) {
-        if (!options.tableName().isPresent()) {
-            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
-        }
-        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
-        }
+    PhoenixScan(StructType schema, CaseInsensitiveStringMap options, String whereClause) {
+        this.schema = schema;
         this.options = options;
-        this.tableName = options.tableName().get();
-        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get();
-        this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+        this.whereClause = whereClause;
         this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
-        setSchema();
+        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL);
+        tableName = options.get("table");
     }
 
-    /**
-     * Sets the schema using all the table columns before any column pruning has been done
-     */
-    private void setSchema() {
-        try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
-            List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
-            Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
-            schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+    private void populateOverriddenProperties(){

Review comment:
       This again conflates the overriden properties with the rest.
   
   Please try to clean up the option handling/merging a bit.

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixBatchWrite.java
##########
@@ -17,45 +17,35 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
 import org.apache.spark.sql.types.StructType;
 
+import java.util.Map;
+
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
-import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 
-public class PhoenixDataSourceWriter implements DataSourceWriter {
+public class PhoenixBatchWrite implements BatchWrite {
 
     private final PhoenixDataSourceWriteOptions options;
+    private final LogicalWriteInfo writeInfo;
 
-    public PhoenixDataSourceWriter(SaveMode mode, StructType schema, DataSourceOptions options) {
-        if (!mode.equals(SaveMode.Overwrite)) {

Review comment:
       Given the previous buggy operation where we demanded SaveMode.Overwrite, I think that we should
   check the SaveMode here, and log a warning if it is not **Append**.
   We should probably throw an exception for any mode other than Append and Overwrite.
   
   For the warning someghing like:
   "Only SaveMode.Append is supported. Savemode.Overwrite is accepted for backwards compatibility, but
   it wil behave as if Savemode.Append was specified."

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataSourceWriteOptions.java
##########
@@ -81,6 +82,14 @@ boolean skipNormalizingIdentifier() {
     }
 
     Properties getOverriddenProps() {
+        String scn = getScn();

Review comment:
       Same problem with the method name as in PhoenixDataSourceReaderOptions.

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixBatchWrite.java
##########
@@ -17,45 +17,35 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
 import org.apache.spark.sql.types.StructType;
 
+import java.util.Map;
+
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
-import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 
-public class PhoenixDataSourceWriter implements DataSourceWriter {
+public class PhoenixBatchWrite implements BatchWrite {
 
     private final PhoenixDataSourceWriteOptions options;
+    private final LogicalWriteInfo writeInfo;
 
-    public PhoenixDataSourceWriter(SaveMode mode, StructType schema, DataSourceOptions options) {
-        if (!mode.equals(SaveMode.Overwrite)) {
-            throw new RuntimeException("SaveMode other than SaveMode.OverWrite is not supported");
-        }
-        if (!options.tableName().isPresent()) {
-            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
-        }
-        if (!options.get(ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + ZOOKEEPER_URL + " defined");
-        }
-        this.options = createPhoenixDataSourceWriteOptions(options, schema);
-    }
-
-    @Override
-    public DataWriterFactory<InternalRow> createWriterFactory() {
-        return new PhoenixDataWriterFactory(options);
+    PhoenixBatchWrite(LogicalWriteInfo writeInfo, Map<String,String> options) {
+        this.writeInfo = writeInfo;
+        this.options = createPhoenixDataSourceWriteOptions(options, writeInfo.schema());
     }
 
     @Override
-    public boolean useCommitCoordinator() {

Review comment:
       I'm not sure what this does.

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixScan.java
##########
@@ -29,134 +29,97 @@
 import org.apache.phoenix.mapreduce.PhoenixInputSplit;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.spark.FilterExpressionCompiler;
-import org.apache.phoenix.spark.SparkSchemaUtil;
 import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
 import org.apache.spark.sql.types.StructType;
-import scala.Tuple3;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.Properties;
 
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
-public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters,
-        SupportsPushDownRequiredColumns {
+public class PhoenixScan implements Scan, Batch {
 
-    private final DataSourceOptions options;
-    private final String tableName;
+    private final StructType schema;
+    private final CaseInsensitiveStringMap options;
     private final String zkUrl;
-    private final boolean dateAsTimestamp;
     private final Properties overriddenProps;
+    private PhoenixDataSourceReadOptions phoenixDataSourceOptions;
+    private final String tableName;
+    private String currentScnValue;
+    private String tenantId;
+    private boolean splitByStats;
+    private final String whereClause;
 
-    private StructType schema;
-    private Filter[] pushedFilters = new Filter[]{};
-    // derived from pushedFilters
-    private String whereClause;
-
-    public PhoenixDataSourceReader(DataSourceOptions options) {
-        if (!options.tableName().isPresent()) {
-            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
-        }
-        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
-        }
+    PhoenixScan(StructType schema, CaseInsensitiveStringMap options, String whereClause) {
+        this.schema = schema;
         this.options = options;
-        this.tableName = options.tableName().get();
-        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get();
-        this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+        this.whereClause = whereClause;
         this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
-        setSchema();
+        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL);
+        tableName = options.get("table");
     }
 
-    /**
-     * Sets the schema using all the table columns before any column pruning has been done
-     */
-    private void setSchema() {
-        try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
-            List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
-            Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
-            schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+    private void populateOverriddenProperties(){
+        currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+        tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+        // Generate splits based off statistics, or just region splits?
+        splitByStats = options.getBoolean(
+                PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
+        if(currentScnValue != null) {
+            overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
         }
-        catch (SQLException e) {
-            throw new RuntimeException(e);
+        if (tenantId != null){
+            overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         }
     }
 
-    PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions readOptions,
-            PhoenixInputSplit inputSplit) {
-        return new PhoenixInputPartition(readOptions, schema, inputSplit);
-    }
-
     @Override
     public StructType readSchema() {
         return schema;
     }
 
     @Override
-    public Filter[] pushFilters(Filter[] filters) {
-        Tuple3<String, Filter[], Filter[]> tuple3 = new FilterExpressionCompiler().pushFilters(filters);
-        whereClause = tuple3._1();
-        pushedFilters = tuple3._3();
-        return tuple3._2();
+    public String description() {
+        return this.getClass().toString();
     }
 
     @Override
-    public List<InputPartition<InternalRow>> planInputPartitions() {
-        Optional<String> currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
-        Optional<String> tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
-        // Generate splits based off statistics, or just region splits?
-        boolean splitByStats = options.getBoolean(
-                PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
-        if(currentScnValue.isPresent()) {
-            overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get());
-        }
-        if (tenantId.isPresent()){
-            overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get());
-        }
+    public Batch toBatch() {
+        return this;
+    }
+
+    @Override
+    public InputPartition[] planInputPartitions() {
+        populateOverriddenProperties();
         try (Connection conn = DriverManager.getConnection(
                 JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
             List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, new ArrayList<>(
-                Arrays.asList(schema.names())));
+                    Arrays.asList(schema.names())));
             final Statement statement = conn.createStatement();
             final String selectStatement = QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause);
             if (selectStatement == null){
                 throw new NullPointerException();
             }
-
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
             // Optimize the query plan so that we potentially use secondary indexes
             final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
-            final Scan scan = queryPlan.getContext().getScan();
-
-            // setting the snapshot configuration
-            Optional<String> snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);

Review comment:
       We are losing functionality here.
   Is there any reason you haven't added code for this option in PhoenixDataSourceReadOptions ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] stoty commented on a change in pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
stoty commented on a change in pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r797705703



##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixScan.java
##########
@@ -29,134 +29,104 @@
 import org.apache.phoenix.mapreduce.PhoenixInputSplit;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.spark.FilterExpressionCompiler;
-import org.apache.phoenix.spark.SparkSchemaUtil;
 import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
 import org.apache.spark.sql.types.StructType;
-import scala.Tuple3;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.Properties;
 
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
-public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters,
-        SupportsPushDownRequiredColumns {
+public class PhoenixScan implements Scan, Batch {
 
-    private final DataSourceOptions options;
-    private final String tableName;
+    private final StructType schema;
+    private final CaseInsensitiveStringMap options;
     private final String zkUrl;
-    private final boolean dateAsTimestamp;
     private final Properties overriddenProps;
+    private PhoenixDataSourceReadOptions phoenixDataSourceOptions;
+    private final String tableName;
+    private String currentScnValue;
+    private String tenantId;
+    private boolean splitByStats;
+    private final String whereClause;
 
-    private StructType schema;
-    private Filter[] pushedFilters = new Filter[]{};
-    // derived from pushedFilters
-    private String whereClause;
-
-    public PhoenixDataSourceReader(DataSourceOptions options) {
-        if (!options.tableName().isPresent()) {
-            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
-        }
-        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
-        }
+    PhoenixScan(StructType schema, CaseInsensitiveStringMap options, String whereClause) {
+        this.schema = schema;
         this.options = options;
-        this.tableName = options.tableName().get();
-        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get();
-        this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+        this.whereClause = whereClause;
         this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
-        setSchema();
+        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL);
+        tableName = options.get("table");
     }
 
-    /**
-     * Sets the schema using all the table columns before any column pruning has been done
-     */
-    private void setSchema() {
-        try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
-            List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
-            Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
-            schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+    private void populateOverriddenProperties(){
+        currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+        tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+        // Generate splits based off statistics, or just region splits?
+        splitByStats = options.getBoolean(
+                PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
+        if(currentScnValue != null) {
+            overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
         }
-        catch (SQLException e) {
-            throw new RuntimeException(e);
+        if (tenantId != null){
+            overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         }
     }
 
-    PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions readOptions,
-            PhoenixInputSplit inputSplit) {
-        return new PhoenixInputPartition(readOptions, schema, inputSplit);
-    }
-
     @Override
     public StructType readSchema() {
         return schema;
     }
 
     @Override
-    public Filter[] pushFilters(Filter[] filters) {
-        Tuple3<String, Filter[], Filter[]> tuple3 = new FilterExpressionCompiler().pushFilters(filters);
-        whereClause = tuple3._1();
-        pushedFilters = tuple3._3();
-        return tuple3._2();
+    public String description() {
+        return this.getClass().toString();
     }
 
     @Override
-    public List<InputPartition<InternalRow>> planInputPartitions() {
-        Optional<String> currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
-        Optional<String> tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
-        // Generate splits based off statistics, or just region splits?
-        boolean splitByStats = options.getBoolean(
-                PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
-        if(currentScnValue.isPresent()) {
-            overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get());
-        }
-        if (tenantId.isPresent()){
-            overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get());
-        }
+    public Batch toBatch() {
+        return this;
+    }
+
+    @Override
+    public InputPartition[] planInputPartitions() {
+        populateOverriddenProperties();
         try (Connection conn = DriverManager.getConnection(
                 JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
             List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, new ArrayList<>(
-                Arrays.asList(schema.names())));
+                    Arrays.asList(schema.names())));
             final Statement statement = conn.createStatement();
             final String selectStatement = QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause);
             if (selectStatement == null){
                 throw new NullPointerException();
             }
-
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
             // Optimize the query plan so that we potentially use secondary indexes
             final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
-            final Scan scan = queryPlan.getContext().getScan();
+            final org.apache.hadoop.hbase.client.Scan scan = queryPlan.getContext().getScan();
 
             // setting the snapshot configuration
-            Optional<String> snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
-            if (snapshotName.isPresent())
-                PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().
-                        getQueryServices().getConfiguration(), snapshotName.get());
+            String snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+
+            if(snapshotName != null){
+                PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().getQueryServices().getConfiguration(), snapshotName);

Review comment:
       It's not off, it's just the GitHUb formatting.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 closed pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 closed pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] stoty commented on a change in pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
stoty commented on a change in pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r799314827



##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/PhoenixDataSource.java
##########
@@ -17,42 +17,76 @@
  */
 package org.apache.phoenix.spark.datasource.v2;
 
-import java.util.Optional;
-import java.util.Properties;
-
-import org.apache.phoenix.spark.datasource.v2.reader.PhoenixDataSourceReader;
-import org.apache.phoenix.spark.datasource.v2.writer.PhoenixDataSourceWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.spark.sql.SaveMode;
+import org.apache.phoenix.spark.SparkSchemaUtil;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.sources.DataSourceRegister;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.WriteSupport;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
 /**
  * Implements the DataSourceV2 api to read and write from Phoenix tables
  */
-public class PhoenixDataSource  implements DataSourceV2,  ReadSupport, WriteSupport, DataSourceRegister {
+public class PhoenixDataSource implements TableProvider, DataSourceRegister {
 
     private static final Logger logger = LoggerFactory.getLogger(PhoenixDataSource.class);
+    public static final String TABLE_KEY = "table";
     public static final String SKIP_NORMALIZING_IDENTIFIER = "skipNormalizingIdentifier";
     public static final String ZOOKEEPER_URL = "zkUrl";
     public static final String PHOENIX_CONFIGS = "phoenixconfigs";
+    protected StructType schema;
+    private CaseInsensitiveStringMap options;
 
     @Override
-    public DataSourceReader createReader(DataSourceOptions options) {
-        return new PhoenixDataSourceReader(options);
+    public StructType inferSchema(CaseInsensitiveStringMap options){
+        if (options.get("table") == null) {

Review comment:
       I can still find a lot of "table" strings in the code.
   It should only have instance, where the TABLE_KEY constant is defined.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] stoty commented on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
stoty commented on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1029893176


   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime | Comment |
   |:----:|----------:|--------:|:--------|
   | +0 :ok: |  reexec  |   5m 33s |  Docker mode activated.  |
   ||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  No case conflicting files found.  |
   | +1 :green_heart: |  hbaseanti  |   0m  0s |  Patch does not have any anti-patterns.  |
   | +1 :green_heart: |  @author  |   0m  1s |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  The patch appears to include 18 new or modified test files.  |
   ||| _ master Compile Tests _ |
   | +0 :ok: |  mvndep  |  12m 31s |  Maven dependency ordering for branch  |
   | +1 :green_heart: |  mvninstall  |  16m 42s |  master passed  |
   | +1 :green_heart: |  compile  |   2m 38s |  master passed  |
   | +1 :green_heart: |  checkstyle  |   0m 44s |  master passed  |
   | +1 :green_heart: |  javadoc  |   1m 13s |  master passed  |
   | -1 :x: |  scaladoc  |   0m 10s |  root in master failed.  |
   | +0 :ok: |  spotbugs  |   2m 14s |  phoenix-spark-base in master has 40 extant spotbugs warnings.  |
   | +0 :ok: |  spotbugs  |   3m 41s |  root in master has 116 extant spotbugs warnings.  |
   ||| _ Patch Compile Tests _ |
   | +0 :ok: |  mvndep  |   0m 38s |  Maven dependency ordering for patch  |
   | -1 :x: |  mvninstall  |   9m 48s |  root in the patch failed.  |
   | -1 :x: |  compile  |   1m 36s |  root in the patch failed.  |
   | -1 :x: |  javac  |   1m 36s |  root in the patch failed.  |
   | -1 :x: |  scalac  |   1m 36s |  root in the patch failed.  |
   | -1 :x: |  checkstyle  |   0m 51s |  root: The patch generated 260 new + 746 unchanged - 166 fixed = 1006 total (was 912)  |
   | +1 :green_heart: |  whitespace  |   0m  0s |  The patch has no whitespace issues.  |
   | +1 :green_heart: |  xml  |   0m  2s |  The patch has no ill-formed XML file.  |
   | -1 :x: |  javadoc  |   0m 22s |  phoenix-spark-base generated 180 new + 4 unchanged - 6 fixed = 184 total (was 10)  |
   | -1 :x: |  javadoc  |   1m  0s |  root generated 180 new + 14 unchanged - 6 fixed = 194 total (was 20)  |
   | -1 :x: |  scaladoc  |   0m 38s |  phoenix-spark-base in the patch failed.  |
   | -1 :x: |  scaladoc  |   0m  8s |  root in the patch failed.  |
   | -1 :x: |  spotbugs  |   0m 41s |  phoenix-spark-base in the patch failed.  |
   | -1 :x: |  spotbugs  |   2m 45s |  root in the patch failed.  |
   ||| _ Other Tests _ |
   | -1 :x: |  unit  |  65m 33s |  root in the patch failed.  |
   | -1 :x: |  asflicense  |   0m 19s |  The patch generated 5 ASF License warnings.  |
   |  |   | 131m 47s |   |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/Dockerfile |
   | GITHUB PR | https://github.com/apache/phoenix-connectors/pull/69 |
   | Optional Tests | dupname asflicense javac javadoc unit xml compile spotbugs hbaseanti checkstyle scalac scaladoc |
   | uname | Linux cb92bcfd7755 4.15.0-112-generic #113-Ubuntu SMP Thu Jul 9 23:41:39 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | dev/phoenix-connectors-personality.sh |
   | git revision | master / 72496e5 |
   | Default Java | Private Build-1.8.0_242-8u242-b08-0ubuntu3~16.04-b08 |
   | scaladoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/branch-scaladoc-root.txt |
   | mvninstall | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/patch-mvninstall-root.txt |
   | compile | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/patch-compile-root.txt |
   | javac | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/patch-compile-root.txt |
   | scalac | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/patch-compile-root.txt |
   | checkstyle | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/diff-checkstyle-root.txt |
   | javadoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/diff-javadoc-javadoc-phoenix-spark-base.txt |
   | javadoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/diff-javadoc-javadoc-root.txt |
   | scaladoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/patch-scaladoc-phoenix-spark-base.txt |
   | scaladoc | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/patch-scaladoc-root.txt |
   | spotbugs | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/patch-spotbugs-phoenix-spark-base.txt |
   | spotbugs | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/patch-spotbugs-root.txt |
   | unit | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/patch-unit-root.txt |
   |  Test Results | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/testReport/ |
   | asflicense | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/artifact/yetus-general-check/output/patch-asflicense-problems.txt |
   | Max. process+thread count | 1920 (vs. ulimit of 30000) |
   | modules | C: phoenix-spark-base . U: . |
   | Console output | https://ci-hadoop.apache.org/job/Phoenix/job/Phoenix-Connectors-PreCommit-GitHub-PR/job/PR-69/3/console |
   | versions | git=2.7.4 maven=3.3.9 spotbugs=4.1.3 |
   | Powered by | Apache Yetus 0.12.0 https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 commented on a change in pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 commented on a change in pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#discussion_r797455447



##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixBatchWrite.java
##########
@@ -17,45 +17,35 @@
  */
 package org.apache.phoenix.spark.datasource.v2.writer;
 
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
 import org.apache.spark.sql.types.StructType;
 
+import java.util.Map;
+
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
-import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID;
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;
+import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.SKIP_NORMALIZING_IDENTIFIER;
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 
-public class PhoenixDataSourceWriter implements DataSourceWriter {
+public class PhoenixBatchWrite implements BatchWrite {
 
     private final PhoenixDataSourceWriteOptions options;
+    private final LogicalWriteInfo writeInfo;
 
-    public PhoenixDataSourceWriter(SaveMode mode, StructType schema, DataSourceOptions options) {
-        if (!mode.equals(SaveMode.Overwrite)) {

Review comment:
       SaveMode has been removed from the DataSourceV2 API and hence the there is no parameter SaveMode that can be used here. Reference: https://issues.apache.org/jira/browse/SPARK-25531
   It is a requirement though, that for the writes, SaveMode should be "Append".  

##########
File path: phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixScan.java
##########
@@ -29,134 +29,97 @@
 import org.apache.phoenix.mapreduce.PhoenixInputSplit;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.spark.FilterExpressionCompiler;
-import org.apache.phoenix.spark.SparkSchemaUtil;
 import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
 import org.apache.spark.sql.types.StructType;
-import scala.Tuple3;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.Properties;
 
 import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.extractPhoenixHBaseConfFromOptions;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 
-public class PhoenixDataSourceReader implements DataSourceReader, SupportsPushDownFilters,
-        SupportsPushDownRequiredColumns {
+public class PhoenixScan implements Scan, Batch {
 
-    private final DataSourceOptions options;
-    private final String tableName;
+    private final StructType schema;
+    private final CaseInsensitiveStringMap options;
     private final String zkUrl;
-    private final boolean dateAsTimestamp;
     private final Properties overriddenProps;
+    private PhoenixDataSourceReadOptions phoenixDataSourceOptions;
+    private final String tableName;
+    private String currentScnValue;
+    private String tenantId;
+    private boolean splitByStats;
+    private final String whereClause;
 
-    private StructType schema;
-    private Filter[] pushedFilters = new Filter[]{};
-    // derived from pushedFilters
-    private String whereClause;
-
-    public PhoenixDataSourceReader(DataSourceOptions options) {
-        if (!options.tableName().isPresent()) {
-            throw new RuntimeException("No Phoenix option " + DataSourceOptions.TABLE_KEY + " defined");
-        }
-        if (!options.get(PhoenixDataSource.ZOOKEEPER_URL).isPresent()) {
-            throw new RuntimeException("No Phoenix option " + PhoenixDataSource.ZOOKEEPER_URL + " defined");
-        }
+    PhoenixScan(StructType schema, CaseInsensitiveStringMap options, String whereClause) {
+        this.schema = schema;
         this.options = options;
-        this.tableName = options.tableName().get();
-        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL).get();
-        this.dateAsTimestamp = options.getBoolean("dateAsTimestamp", false);
+        this.whereClause = whereClause;
         this.overriddenProps = extractPhoenixHBaseConfFromOptions(options);
-        setSchema();
+        this.zkUrl = options.get(PhoenixDataSource.ZOOKEEPER_URL);
+        tableName = options.get("table");
     }
 
-    /**
-     * Sets the schema using all the table columns before any column pruning has been done
-     */
-    private void setSchema() {
-        try (Connection conn = DriverManager.getConnection(
-                JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
-            List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, null);
-            Seq<ColumnInfo> columnInfoSeq = JavaConverters.asScalaIteratorConverter(columnInfos.iterator()).asScala().toSeq();
-            schema = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoSeq, dateAsTimestamp);
+    private void populateOverriddenProperties(){
+        currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+        tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+        // Generate splits based off statistics, or just region splits?
+        splitByStats = options.getBoolean(
+                PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
+        if(currentScnValue != null) {
+            overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
         }
-        catch (SQLException e) {
-            throw new RuntimeException(e);
+        if (tenantId != null){
+            overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         }
     }
 
-    PhoenixInputPartition getInputPartition(PhoenixDataSourceReadOptions readOptions,
-            PhoenixInputSplit inputSplit) {
-        return new PhoenixInputPartition(readOptions, schema, inputSplit);
-    }
-
     @Override
     public StructType readSchema() {
         return schema;
     }
 
     @Override
-    public Filter[] pushFilters(Filter[] filters) {
-        Tuple3<String, Filter[], Filter[]> tuple3 = new FilterExpressionCompiler().pushFilters(filters);
-        whereClause = tuple3._1();
-        pushedFilters = tuple3._3();
-        return tuple3._2();
+    public String description() {
+        return this.getClass().toString();
     }
 
     @Override
-    public List<InputPartition<InternalRow>> planInputPartitions() {
-        Optional<String> currentScnValue = options.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
-        Optional<String> tenantId = options.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
-        // Generate splits based off statistics, or just region splits?
-        boolean splitByStats = options.getBoolean(
-                PhoenixConfigurationUtil.MAPREDUCE_SPLIT_BY_STATS, PhoenixConfigurationUtil.DEFAULT_SPLIT_BY_STATS);
-        if(currentScnValue.isPresent()) {
-            overriddenProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue.get());
-        }
-        if (tenantId.isPresent()){
-            overriddenProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId.get());
-        }
+    public Batch toBatch() {
+        return this;
+    }
+
+    @Override
+    public InputPartition[] planInputPartitions() {
+        populateOverriddenProperties();
         try (Connection conn = DriverManager.getConnection(
                 JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overriddenProps)) {
             List<ColumnInfo> columnInfos = PhoenixRuntime.generateColumnInfo(conn, tableName, new ArrayList<>(
-                Arrays.asList(schema.names())));
+                    Arrays.asList(schema.names())));
             final Statement statement = conn.createStatement();
             final String selectStatement = QueryUtil.constructSelectStatement(tableName, columnInfos, whereClause);
             if (selectStatement == null){
                 throw new NullPointerException();
             }
-
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
             // Optimize the query plan so that we potentially use secondary indexes
             final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
-            final Scan scan = queryPlan.getContext().getScan();
-
-            // setting the snapshot configuration
-            Optional<String> snapshotName = options.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);

Review comment:
       Woops, somehow missed it, will add it now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 edited a comment on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 edited a comment on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1031147332


   > Changes need to be pushed by a committer. I'm going to push the change when it's done. I'm going to ask a review for it from you first.
   > 
   > I'll leave you as the author of the patch, and add myself as a co-author if that's all right with you.
   
   Oh okay, sounds great. Its all good with me, thank you🙂


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [phoenix-connectors] ashwinb1998 commented on pull request #69: PHOENIX-6632 Migrate/Update connectors to spark-3

Posted by GitBox <gi...@apache.org>.
ashwinb1998 commented on pull request #69:
URL: https://github.com/apache/phoenix-connectors/pull/69#issuecomment-1072085140


   Sure @stoty, will do


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org