You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2023/06/13 06:43:04 UTC
[doris-spark-connector] branch master updated: [improvement] add new param for ignored type (#113)
This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 5c9ef61 [improvement] add new param for ignored type (#113)
5c9ef61 is described below
commit 5c9ef61506b1224352367324053be2b689d103fd
Author: gnehil <ad...@gmail.com>
AuthorDate: Tue Jun 13 14:42:59 2023 +0800
[improvement] add new param for ignored type (#113)
* add new param doris.ignore-type to ignore columns of specified type when creating relation
* add comment
---
.../doris/spark/cfg/ConfigurationOptions.java | 7 +++++++
.../org/apache/doris/spark/sql/SchemaUtils.scala | 19 ++++++++++++++-----
.../apache/doris/spark/sql/TestSchemaUtils.scala | 22 +++++++++++++++++++++-
3 files changed, 42 insertions(+), 6 deletions(-)
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index a3f4061..9783c46 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -90,4 +90,11 @@ public interface ConfigurationOptions {
int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50;
+ /**
+ * set types to ignore, split by comma
+ * e.g.
+ * "doris.ignore-type"="bitmap,hll"
+ */
+ String DORIS_IGNORE_TYPE = "doris.ignore-type";
+
}
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index 0535805..c7fad41 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -18,12 +18,13 @@
package org.apache.doris.spark.sql
import org.apache.doris.sdk.thrift.TScanColumnDesc
+
import scala.collection.JavaConversions._
import org.apache.doris.spark.cfg.Settings
import org.apache.doris.spark.exception.DorisException
import org.apache.doris.spark.rest.RestService
import org.apache.doris.spark.rest.models.{Field, Schema}
-import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD
+import org.apache.doris.spark.cfg.ConfigurationOptions.{DORIS_IGNORE_TYPE, DORIS_READ_FIELD}
import org.apache.spark.sql.types._
import org.slf4j.LoggerFactory
@@ -38,7 +39,9 @@ private[spark] object SchemaUtils {
*/
def discoverSchema(cfg: Settings): StructType = {
val schema = discoverSchemaFromFe(cfg)
- convertToStruct(cfg.getProperty(DORIS_READ_FIELD), schema)
+ val dorisReadField = cfg.getProperty(DORIS_READ_FIELD)
+ val ignoreColumnType = cfg.getProperty(DORIS_IGNORE_TYPE)
+ convertToStruct(schema, dorisReadField, ignoreColumnType)
}
/**
@@ -57,14 +60,20 @@ private[spark] object SchemaUtils {
* @param schema inner schema
* @return Spark Catalyst StructType
*/
- def convertToStruct(dorisReadFields: String, schema: Schema): StructType = {
- val fieldList = if (dorisReadFields != null && dorisReadFields.length > 0) {
+ def convertToStruct(schema: Schema, dorisReadFields: String, ignoredTypes: String): StructType = {
+ val fieldList = if (dorisReadFields != null && dorisReadFields.nonEmpty) {
dorisReadFields.split(",")
} else {
Array.empty[String]
}
+ val ignoredTypeList = if (ignoredTypes != null && ignoredTypes.nonEmpty) {
+ ignoredTypes.split(",").map(t => t.trim.toUpperCase)
+ } else {
+ Array.empty[String]
+ }
val fields = schema.getProperties
- .filter(x => fieldList.contains(x.getName) || fieldList.isEmpty)
+ .filter(x => (fieldList.contains(x.getName) || fieldList.isEmpty)
+ && !ignoredTypeList.contains(x.getType))
.map(f =>
DataTypes.createStructField(
f.getName,
diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
index f944aaf..e11fb4f 100644
--- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
+++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
@@ -41,7 +41,7 @@ class TestSchemaUtils extends ExpectedExceptionTest {
fields :+= DataTypes.createStructField("k1", DataTypes.ByteType, true)
fields :+= DataTypes.createStructField("k5", DataTypes.LongType, true)
val expected = DataTypes.createStructType(fields.asJava)
- Assert.assertEquals(expected, SchemaUtils.convertToStruct("k1,k5", schema))
+ Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, "k1,k5", null))
}
@Test
@@ -93,4 +93,24 @@ class TestSchemaUtils extends ExpectedExceptionTest {
Assert.assertEquals(expected, SchemaUtils.convertToSchema(Seq(k1, k2)))
}
+
+ @Test
+ def testIgnoreTypes(): Unit = {
+
+ val schema = new Schema
+ schema.setStatus(200)
+ val col1 = new Field("col1", "TINYINT", "", 0, 0, "")
+ val col2 = new Field("col2", "BITMAP", "", 0, 0, "")
+ val col3 = new Field("col3", "HLL", "", 0, 0, "")
+ schema.put(col1)
+ schema.put(col2)
+ schema.put(col3)
+
+ var fields = List[StructField]()
+ fields :+= DataTypes.createStructField("col1", DataTypes.ByteType, true)
+ val expected = DataTypes.createStructType(fields.asJava)
+ Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, null, "bitmap,hll"))
+
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org