You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/08/03 01:40:56 UTC
[phoenix-connectors] branch master updated: PHOENIX-5410 Phoenix
spark to hbase connector takes long time persist data
This is an automated email from the ASF dual-hosted git repository.
chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new 4a4308a PHOENIX-5410 Phoenix spark to hbase connector takes long time persist data
4a4308a is described below
commit 4a4308a86c2224a2cf0bd9efb0f35df2680b556b
Author: Manohar Chamaraju <ma...@microfocus.com>
AuthorDate: Fri Aug 2 15:43:30 2019 +0530
PHOENIX-5410 Phoenix spark to hbase connector takes long time persist data
Signed-off-by: Chinmay Kulkarni <ch...@gmail.com>
---
.../spark/datasource/v2/writer/PhoenixDataWriter.java | 18 ++++++++++++++++--
.../sql/execution/datasources/jdbc/SparkJdbcUtil.scala | 4 ++--
2 files changed, 18 insertions(+), 4 deletions(-)
diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
index 04670d5..f67695c 100644
--- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java
@@ -22,6 +22,7 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
@@ -32,6 +33,8 @@ import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.execution.datasources.SparkJdbcUtil;
import org.apache.spark.sql.execution.datasources.jdbc.PhoenixJdbcDialect$;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
@@ -39,6 +42,9 @@ import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.expressions.AttributeReference;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
import com.google.common.collect.Lists;
@@ -55,6 +61,7 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> {
private final PreparedStatement statement;
private final long batchSize;
private long numRecords = 0;
+ private ExpressionEncoder<Row> encoder = null;
PhoenixDataWriter(PhoenixDataSourceWriteOptions options) {
String scn = options.getScn();
@@ -68,6 +75,13 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> {
overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
}
this.schema = options.getSchema();
+
+ List<Attribute> attrs = new ArrayList<>();
+
+ for (AttributeReference ref : scala.collection.JavaConverters.seqAsJavaListConverter(schema.toAttributes()).asJava()) {
+ attrs.add(ref.toAttribute());
+ }
+ encoder = RowEncoder$.MODULE$.apply(schema).resolveAndBind( scala.collection.JavaConverters.asScalaIteratorConverter(attrs.iterator()).asScala().toSeq(), SimpleAnalyzer$.MODULE$);
try {
this.conn = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl,
overridingProps);
@@ -92,14 +106,14 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> {
public void write(InternalRow internalRow) throws IOException {
try {
int i=0;
+ Row row = SparkJdbcUtil.toRow(encoder, internalRow);
for (StructField field : schema.fields()) {
DataType dataType = field.dataType();
if (internalRow.isNullAt(i)) {
statement.setNull(i + 1, SparkJdbcUtil.getJdbcType(dataType,
PhoenixJdbcDialect$.MODULE$).jdbcNullType());
} else {
- Row row = SparkJdbcUtil.toRow(schema, internalRow);
- SparkJdbcUtil.makeSetter(conn, PhoenixJdbcDialect$.MODULE$, dataType).apply(statement, row, i);
+ SparkJdbcUtil.makeSetter(conn, PhoenixJdbcDialect$.MODULE$, dataType).apply(statement, row, i);
}
++i;
}
diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
index 50cdbf5..97b0525 100644
--- a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
+++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala
@@ -34,11 +34,11 @@ Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, Metadata, Sh
StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.NextIterator
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
object SparkJdbcUtil {
- def toRow(schema: StructType, internalRow: InternalRow) : Row = {
- val encoder = RowEncoder(schema).resolveAndBind()
+ def toRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow) : Row = {
encoder.fromRow(internalRow)
}