You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/08/03 01:40:56 UTC

[phoenix-connectors] branch master updated: PHOENIX-5410 Phoenix spark to hbase connector takes long time persist data

This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a4308a  PHOENIX-5410 Phoenix spark to hbase connector takes long time persist data
4a4308a is described below

commit 4a4308a86c2224a2cf0bd9efb0f35df2680b556b
Author: Manohar Chamaraju <ma...@microfocus.com>
AuthorDate: Fri Aug 2 15:43:30 2019 +0530

    PHOENIX-5410 Phoenix spark to hbase connector takes long time persist data
    
    Signed-off-by: Chinmay Kulkarni <ch...@gmail.com>
---
 .../spark/datasource/v2/writer/PhoenixDataWriter.java  | 18 ++++++++++++++++--
 .../sql/execution/datasources/jdbc/SparkJdbcUtil.scala |  4 ++--
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
index 04670d5..f67695c 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
@@ -22,6 +22,7 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.stream.Collectors;
@@ -32,6 +33,8 @@ import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
 import org.apache.spark.sql.execution.datasources.SparkJdbcUtil;
 import org.apache.spark.sql.execution.datasources.jdbc.PhoenixJdbcDialect$;
 import org.apache.spark.sql.sources.v2.writer.DataWriter;
@@ -39,6 +42,9 @@ import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.expressions.AttributeReference;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
 
 import com.google.common.collect.Lists;
 
@@ -55,6 +61,7 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> {
     private final PreparedStatement statement;
     private final long batchSize;
     private long numRecords = 0;
+    private ExpressionEncoder<Row> encoder = null;
 
     PhoenixDataWriter(PhoenixDataSourceWriteOptions options) {
         String scn = options.getScn();
@@ -68,6 +75,13 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> {
             overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         }
         this.schema = options.getSchema();
+        
+        List<Attribute> attrs = new ArrayList<>();
+        
+        for (AttributeReference ref : scala.collection.JavaConverters.seqAsJavaListConverter(schema.toAttributes()).asJava()) {
+        	  attrs.add(ref.toAttribute());
+        }
+        encoder = RowEncoder$.MODULE$.apply(schema).resolveAndBind( scala.collection.JavaConverters.asScalaIteratorConverter(attrs.iterator()).asScala().toSeq(), SimpleAnalyzer$.MODULE$);
         try {
             this.conn = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl,
                     overridingProps);
@@ -92,14 +106,14 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> {
     public void write(InternalRow internalRow) throws IOException {
         try {
             int i=0;
+            Row row = SparkJdbcUtil.toRow(encoder, internalRow);
             for (StructField field : schema.fields()) {
                 DataType dataType = field.dataType();
                 if (internalRow.isNullAt(i)) {
                     statement.setNull(i + 1, SparkJdbcUtil.getJdbcType(dataType,
                             PhoenixJdbcDialect$.MODULE$).jdbcNullType());
                 } else {
-                    Row row = SparkJdbcUtil.toRow(schema, internalRow);
-                    SparkJdbcUtil.makeSetter(conn, PhoenixJdbcDialect$.MODULE$, dataType).apply(statement, row, i);
+                	SparkJdbcUtil.makeSetter(conn, PhoenixJdbcDialect$.MODULE$, dataType).apply(statement, row, i);
                 }
                 ++i;
             }
diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
index 50cdbf5..97b0525 100644
--- a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
+++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
@@ -34,11 +34,11 @@ Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, Metadata, Sh
 StructType, TimestampType}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.NextIterator
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 
 object SparkJdbcUtil {
 
-  def toRow(schema: StructType, internalRow: InternalRow) : Row = {
-    val encoder = RowEncoder(schema).resolveAndBind()
+  def toRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow) : Row = {
     encoder.fromRow(internalRow)
   }