You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2023/06/13 06:43:04 UTC

[doris-spark-connector] branch master updated: [improvement] add new param for ignored type (#113)

This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c9ef61  [improvement] add new param for ignored type (#113)
5c9ef61 is described below

commit 5c9ef61506b1224352367324053be2b689d103fd
Author: gnehil <ad...@gmail.com>
AuthorDate: Tue Jun 13 14:42:59 2023 +0800

    [improvement] add new param for ignored type (#113)
    
    * add new param doris.ignore-type to ignore columns of specified type when creating relation
    * add comment
---
 .../doris/spark/cfg/ConfigurationOptions.java      |  7 +++++++
 .../org/apache/doris/spark/sql/SchemaUtils.scala   | 19 ++++++++++++++-----
 .../apache/doris/spark/sql/TestSchemaUtils.scala   | 22 +++++++++++++++++++++-
 3 files changed, 42 insertions(+), 6 deletions(-)

diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index a3f4061..9783c46 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -90,4 +90,11 @@ public interface ConfigurationOptions {
 
     int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50;
 
+    /**
+     * set types to ignore, split by comma
+     * e.g.
+     * "doris.ignore-type"="bitmap,hll"
+     */
+    String DORIS_IGNORE_TYPE = "doris.ignore-type";
+
 }
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index 0535805..c7fad41 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -18,12 +18,13 @@
 package org.apache.doris.spark.sql
 
 import org.apache.doris.sdk.thrift.TScanColumnDesc
+
 import scala.collection.JavaConversions._
 import org.apache.doris.spark.cfg.Settings
 import org.apache.doris.spark.exception.DorisException
 import org.apache.doris.spark.rest.RestService
 import org.apache.doris.spark.rest.models.{Field, Schema}
-import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD
+import org.apache.doris.spark.cfg.ConfigurationOptions.{DORIS_IGNORE_TYPE, DORIS_READ_FIELD}
 import org.apache.spark.sql.types._
 import org.slf4j.LoggerFactory
 
@@ -38,7 +39,9 @@ private[spark] object SchemaUtils {
    */
   def discoverSchema(cfg: Settings): StructType = {
     val schema = discoverSchemaFromFe(cfg)
-    convertToStruct(cfg.getProperty(DORIS_READ_FIELD), schema)
+    val dorisReadField = cfg.getProperty(DORIS_READ_FIELD)
+    val ignoreColumnType = cfg.getProperty(DORIS_IGNORE_TYPE)
+    convertToStruct(schema, dorisReadField, ignoreColumnType)
   }
 
   /**
@@ -57,14 +60,20 @@ private[spark] object SchemaUtils {
    * @param schema inner schema
    * @return Spark Catalyst StructType
    */
-  def convertToStruct(dorisReadFields: String, schema: Schema): StructType = {
-    val fieldList = if (dorisReadFields != null && dorisReadFields.length > 0) {
+  def convertToStruct(schema: Schema, dorisReadFields: String, ignoredTypes: String): StructType = {
+    val fieldList = if (dorisReadFields != null && dorisReadFields.nonEmpty) {
       dorisReadFields.split(",")
     } else {
       Array.empty[String]
     }
+    val ignoredTypeList = if (ignoredTypes != null && ignoredTypes.nonEmpty) {
+      ignoredTypes.split(",").map(t => t.trim.toUpperCase)
+    } else {
+      Array.empty[String]
+    }
     val fields = schema.getProperties
-      .filter(x => fieldList.contains(x.getName) || fieldList.isEmpty)
+      .filter(x => (fieldList.contains(x.getName) || fieldList.isEmpty)
+        && !ignoredTypeList.contains(x.getType))
       .map(f =>
         DataTypes.createStructField(
           f.getName,
diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
index f944aaf..e11fb4f 100644
--- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
+++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
@@ -41,7 +41,7 @@ class TestSchemaUtils extends ExpectedExceptionTest {
     fields :+= DataTypes.createStructField("k1", DataTypes.ByteType, true)
     fields :+= DataTypes.createStructField("k5", DataTypes.LongType, true)
     val expected = DataTypes.createStructType(fields.asJava)
-    Assert.assertEquals(expected, SchemaUtils.convertToStruct("k1,k5", schema))
+    Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, "k1,k5", null))
   }
 
   @Test
@@ -93,4 +93,24 @@ class TestSchemaUtils extends ExpectedExceptionTest {
 
     Assert.assertEquals(expected, SchemaUtils.convertToSchema(Seq(k1, k2)))
   }
+
+  @Test
+  def testIgnoreTypes(): Unit = {
+
+    val schema = new Schema
+    schema.setStatus(200)
+    val col1 = new Field("col1", "TINYINT", "", 0, 0, "")
+    val col2 = new Field("col2", "BITMAP", "", 0, 0, "")
+    val col3 = new Field("col3", "HLL", "", 0, 0, "")
+    schema.put(col1)
+    schema.put(col2)
+    schema.put(col3)
+
+    var fields = List[StructField]()
+    fields :+= DataTypes.createStructField("col1", DataTypes.ByteType, true)
+    val expected = DataTypes.createStructType(fields.asJava)
+    Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, null, "bitmap,hll"))
+
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org