You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/03 03:58:52 UTC

[incubator-doris] branch master updated: [RoutineLoad] Support alter broker list and topic for kafka routine load (#6335)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 748604f  [RoutineLoad] Support alter broker list and topic for kafka routine load (#6335)
748604f is described below

commit 748604ff4f3d8693d1f91cdb67f6637491fde5b8
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Tue Aug 3 11:58:38 2021 +0800

    [RoutineLoad] Support alter broker list and topic for kafka routine load (#6335)
    
    ```
    alter routine load for cmy2 from kafka("kafka_broker_list" = "ip2:9094", "kafka_topic" = "my_topic");
    ```
    
    This is useful when the kafka broker list or topic has been changed.
    
    Also modify `show create routine load`, support showing  "kafka_partitions" and "kafka_offsets".
---
 .../Data Manipulation/SHOW CREATE ROUTINE LOAD.md  |  7 +++--
 .../Data Manipulation/alter-routine-load.md        |  4 ++-
 .../Data Manipulation/SHOW CREATE ROUTINE LOAD.md  |  6 ++--
 .../Data Manipulation/alter-routine-load.md        |  4 ++-
 .../analysis/RoutineLoadDataSourceProperties.java  |  2 ++
 .../java/org/apache/doris/analysis/Separator.java  |  4 +++
 .../doris/analysis/ShowCreateRoutineLoadStmt.java  |  6 ++--
 .../doris/load/routineload/KafkaProgress.java      | 27 ++++++++++++++++--
 .../load/routineload/KafkaRoutineLoadJob.java      | 18 ++++++++----
 .../doris/load/routineload/RoutineLoadJob.java     | 32 ++++++++++++++++------
 .../load/routineload/RoutineLoadProgress.java      |  2 +-
 .../doris/analysis/AlterRoutineLoadStmtTest.java   |  4 +--
 .../RoutineLoadDataSourcePropertiesTest.java       |  4 +--
 .../doris/load/routineload/RoutineLoadJobTest.java | 20 ++++++++------
 14 files changed, 101 insertions(+), 39 deletions(-)

diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md
index ce783c5..499b82c 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md	
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md	
@@ -26,7 +26,10 @@ under the License.
 
 # SHOW CREATE ROUTINE LOAD
 ## description
-    The statement is used to show the routine load job creation statement of user-defined
+    The statement is used to show the routine load job creation statement of user-defined.
+
+	The kafka partition and offset in the result show the currently consumed partition and the corresponding offset to be consumed.
+
     grammar:
         SHOW [ALL] CREATE ROUTINE LOAD for load_name;
         
@@ -39,4 +42,4 @@ under the License.
         SHOW CREATE ROUTINE LOAD for test_load
 
 ## keyword
-    SHOW,CREATE,ROUTINE,LOAD
\ No newline at end of file
+    SHOW,CREATE,ROUTINE,LOAD
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md b/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md
index 41b5e0e..bc22697 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md	
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md	
@@ -72,7 +72,9 @@ Syntax:
     
     1. `kafka_partitions`
     2. `kafka_offsets`
-    3. Custom property, such as `property.group.id`
+    3. `kafka_broker_list`
+    4. `kafka_topic`
+    5. Custom property, such as `property.group.id`
     
     Notice:
     
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md
index 922db38..4809962 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md	
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md	
@@ -26,7 +26,9 @@ under the License.
 
 # SHOW CREATE ROUTINE LOAD
 ## description
-    该语句用于展示例行导入作业的创建语句
+    该语句用于展示例行导入作业的创建语句。
+	结果中的 kafka partition 和 offset 展示的当前消费的 partition,以及对应的待消费的 offset。
+
     语法:
         SHOW [ALL] CREATE ROUTINE LOAD for load_name;
         
@@ -39,4 +41,4 @@ under the License.
         SHOW CREATE ROUTINE LOAD for test_load
 
 ## keyword
-    SHOW,CREATE,ROUTINE,LOAD
\ No newline at end of file
+    SHOW,CREATE,ROUTINE,LOAD
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md
index 7541136..52544a5 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md	
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md	
@@ -76,7 +76,9 @@ under the License.
     
     1. `kafka_partitions`
     2. `kafka_offsets`
-    3. 自定义 property,如 `property.group.id`
+    3. `kafka_broker_list`
+    4. `kafka_topic`
+    5. 自定义 property,如 `property.group.id`
     
     注:
     
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
index 1a10d66..41d3a7b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
@@ -51,6 +51,8 @@ public class RoutineLoadDataSourceProperties {
             .build();
 
     private static final ImmutableSet<String> CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET = new ImmutableSet.Builder<String>()
+            .add(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY)
+            .add(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY)
             .add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY)
             .add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)
             .add(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java
index 11404b5..217c06f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java
@@ -34,6 +34,10 @@ public class Separator implements ParseNode {
         this.separator = null;
     }
 
+    public String getOriSeparator() {
+        return oriSeparator;
+    }
+
     public String getSeparator() {
         return separator;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java
index d0ae92b..7450c08 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java
@@ -27,9 +27,9 @@ public class ShowCreateRoutineLoadStmt extends ShowStmt {
 
     private static final ShowResultSetMetaData META_DATA =
             ShowResultSetMetaData.builder()
-                    .addColumn(new Column("Routine Load Id", ScalarType.createVarchar(20)))
-                    .addColumn(new Column("Routine Load Name", ScalarType.createVarchar(20)))
-                    .addColumn(new Column("Create Routine Load", ScalarType.createVarchar(30)))
+                    .addColumn(new Column("JobId", ScalarType.createVarchar(128)))
+                    .addColumn(new Column("JobName", ScalarType.createVarchar(128)))
+                    .addColumn(new Column("CreateStmt", ScalarType.createVarchar(65535)))
                     .build();
 
     private final LabelName labelName;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
index 6d704d9..6c88b5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
@@ -22,13 +22,14 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.thrift.TKafkaRLTaskProgress;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -128,6 +129,26 @@ public class KafkaProgress extends RoutineLoadProgress {
         }
     }
 
+    public List<Pair<Integer, String>> getPartitionOffsetPairs(boolean alreadyConsumed) {
+        List<Pair<Integer, String>> pairs = Lists.newArrayList();
+        for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
+            if (entry.getValue() == 0) {
+                pairs.add(Pair.create(entry.getKey(), OFFSET_ZERO));
+            } else if (entry.getValue() == -1) {
+                pairs.add(Pair.create(entry.getKey(), OFFSET_END));
+            } else if (entry.getValue() == -2) {
+                pairs.add(Pair.create(entry.getKey(), OFFSET_BEGINNING));
+            } else {
+                long offset = entry.getValue();
+                if (alreadyConsumed) {
+                    offset -= 1;
+                }
+                pairs.add(Pair.create(entry.getKey(), "" + offset));
+            }
+        }
+        return pairs;
+    }
+
     @Override
     public String toString() {
         Map<Integer, String> showPartitionIdToOffset = Maps.newHashMap();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 0496173..ead99ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -55,15 +55,16 @@ import com.google.gson.GsonBuilder;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.parquet.Strings;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
 import java.util.TimeZone;
 import java.util.UUID;
 
@@ -554,7 +555,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                 throw new DdlException("Only supports modification of PAUSED jobs");
             }
 
-
             modifyPropertiesInternal(jobProperties, dataSourceProperties);
 
             AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id,
@@ -593,15 +593,23 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
             ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
         }
 
+        if (!customKafkaProperties.isEmpty()) {
+            this.customProperties.putAll(customKafkaProperties);
+            convertCustomProperties(true);
+        }
+
         if (!jobProperties.isEmpty()) {
             Map<String, String> copiedJobProperties = Maps.newHashMap(jobProperties);
             modifyCommonJobProperties(copiedJobProperties);
             this.jobProperties.putAll(copiedJobProperties);
         }
 
-        if (!customKafkaProperties.isEmpty()) {
-            this.customProperties.putAll(customKafkaProperties);
-            convertCustomProperties(true);
+        // modify broker list and topic
+        if (!Strings.isNullOrEmpty(dataSourceProperties.getKafkaBrokerList())) {
+            this.brokerList = dataSourceProperties.getKafkaBrokerList();
+        }
+        if (!Strings.isNullOrEmpty(dataSourceProperties.getKafkaTopic())) {
+            this.topic = dataSourceProperties.getKafkaTopic();
         }
 
         LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index a9b7083..a758d65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -39,6 +39,7 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
@@ -1325,10 +1326,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         // 4.load_properties
         // 4.1.column_separator
         if (columnSeparator != null) {
-            sb.append("COLUMNS TERMINATED BY \"").append(columnSeparator.getSeparator()).append("\",\n");
+            sb.append("COLUMNS TERMINATED BY \"").append(columnSeparator.getOriSeparator()).append("\",\n");
         }
         // 4.2.columns_mapping
-        if (columnDescs != null) {
+        if (columnDescs != null && !columnDescs.descs.isEmpty()) {
             sb.append("COLUMNS(").append(Joiner.on(",").join(columnDescs.descs)).append("),\n");
         }
         // 4.3.where_predicates
@@ -1352,22 +1353,25 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
             sb.append("PRECEDING FILTER ").append(precedingFilter.toSql()).append(",\n");
         }
         // remove the last ,
-        if (",".equals(sb.charAt(sb.length() - 2))) {
+        if (sb.charAt(sb.length() - 2) == ',') {
             sb.replace(sb.length() - 2, sb.length() - 1, "");
         }
-        // 5.job_properties
+        // 5.job_properties. See PROPERTIES_SET of CreateRoutineLoadStmt
         sb.append("PROPERTIES\n(\n");
         appendProperties(sb, CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, desireTaskConcurrentNum, false);
+        appendProperties(sb, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, maxErrorNum, false);
         appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS, false);
         appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, maxBatchRows, false);
         appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, maxBatchSizeBytes, false);
-        appendProperties(sb, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, maxErrorNum, false);
-        appendProperties(sb, LoadStmt.STRICT_MODE, isStrictMode(), false);
-        appendProperties(sb, LoadStmt.TIMEZONE, getTimezone(), false);
         appendProperties(sb, PROPS_FORMAT, getFormat(), false);
         appendProperties(sb, PROPS_JSONPATHS, getJsonPaths(), false);
         appendProperties(sb, PROPS_STRIP_OUTER_ARRAY, isStripOuterArray(), false);
+        appendProperties(sb, PROPS_NUM_AS_STRING, isNumAsString(), false);
+        appendProperties(sb, PROPS_FUZZY_PARSE, isFuzzyParse(), false);
         appendProperties(sb, PROPS_JSONROOT, getJsonRoot(), true);
+        appendProperties(sb, LoadStmt.STRICT_MODE, isStrictMode(), false);
+        appendProperties(sb, LoadStmt.TIMEZONE, getTimezone(), false);
+        appendProperties(sb, LoadStmt.EXEC_MEM_LIMIT, getMemLimit(), true);
         sb.append(")\n");
         // 6. data_source
         sb.append("FROM ").append(dataSourceType).append("\n");
@@ -1375,13 +1379,25 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         sb.append("(\n");
         getDataSourceProperties().forEach((k, v) -> appendProperties(sb, k, v, false));
         getCustomProperties().forEach((k, v) -> appendProperties(sb, k, v, false));
-        // remove the last ,
+        if (progress instanceof KafkaProgress) {
+            // append partitions and offsets.
+            // the offsets is the next offset to be consumed.
+            List<Pair<Integer, String>> pairs = ((KafkaProgress) progress).getPartitionOffsetPairs(false);
+            appendProperties(sb, CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY,
+                    Joiner.on(", ").join(pairs.stream().map(p -> p.first).toArray()), false);
+            appendProperties(sb, CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY,
+                    Joiner.on(", ").join(pairs.stream().map(p -> p.second).toArray()), false);
+        }
+        // remove the last ","
         sb.replace(sb.length() - 2, sb.length() - 1, "");
         sb.append(");");
         return sb.toString();
     }
 
     private static void appendProperties(StringBuilder sb, String key, Object value, boolean end) {
+        if (value == null || Strings.isNullOrEmpty(value.toString())) {
+            return;
+        }
         sb.append("\"").append(key).append("\"").append(" = ").append("\"").append(value).append("\"");
         if (!end) {
             sb.append(",\n");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
index bf746a6..224dd54 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
@@ -40,7 +40,7 @@ public abstract class RoutineLoadProgress implements Writable {
     abstract void update(RLTaskTxnCommitAttachment attachment);
 
     abstract String toJsonString();
-
+    
     public static RoutineLoadProgress read(DataInput in) throws IOException {
         RoutineLoadProgress progress = null;
         LoadDataSourceType type = LoadDataSourceType.valueOf(Text.readString(in));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java
index f9ddb688..0af88cd 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java
@@ -125,6 +125,7 @@ public class AlterRoutineLoadStmtTest {
             }
         }
 
+        // alter topic is now supported
         {
             Map<String, String> jobProperties = Maps.newHashMap();
             jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
@@ -138,9 +139,8 @@ public class AlterRoutineLoadStmtTest {
 
             try {
                 stmt.analyze(analyzer);
-                Assert.fail();
             } catch (AnalysisException e) {
-                Assert.assertTrue(e.getMessage().contains("kafka_topic is invalid kafka property"));
+                Assert.fail();
             } catch (UserException e) {
                 e.printStackTrace();
                 Assert.fail();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java
index bcadc90..0acdf04 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java
@@ -291,7 +291,7 @@ public class RoutineLoadDataSourcePropertiesTest {
 
     @Test
     public void testAlterAbnormal() {
-        // can not set KAFKA_BROKER_LIST_PROPERTY
+        // now support set KAFKA_BROKER_LIST_PROPERTY
         Map<String, String> properties = Maps.newHashMap();
         properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080");
         properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "-1");
@@ -301,7 +301,7 @@ public class RoutineLoadDataSourcePropertiesTest {
             dsProperties.analyze();
             Assert.fail();
         } catch (UserException e) {
-            Assert.assertTrue(e.getMessage().contains("kafka_broker_list is invalid kafka property"));
+            Assert.assertTrue(e.getMessage().contains("kafka_default_offsets can only be set to OFFSET_BEGINNING, OFFSET_END or date time"));
         }
 
         // can not set datetime formatted offset and integer offset together
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
index 23aeb17..7f0676f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
@@ -31,14 +31,14 @@ import org.apache.doris.thrift.TKafkaRLTaskProgress;
 import org.apache.doris.transaction.TransactionException;
 import org.apache.doris.transaction.TransactionState;
 
-import org.apache.kafka.common.PartitionInfo;
-import org.junit.Assert;
-import org.junit.Test;
-
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
 import java.util.List;
 import java.util.Map;
 
@@ -328,22 +328,24 @@ public class RoutineLoadJobTest {
                 "PROPERTIES\n" +
                 "(\n" +
                 "\"desired_concurrent_number\" = \"0\",\n" +
+                "\"max_error_number\" = \"10\",\n" +
                 "\"max_batch_interval\" = \"10\",\n" +
                 "\"max_batch_rows\" = \"10\",\n" +
                 "\"max_batch_size\" = \"104857600\",\n" +
-                "\"max_error_number\" = \"10\",\n" +
-                "\"strict_mode\" = \"false\",\n" +
-                "\"timezone\" = \"Asia/Shanghai\",\n" +
                 "\"format\" = \"csv\",\n" +
-                "\"jsonpaths\" = \"\",\n" +
                 "\"strip_outer_array\" = \"false\",\n" +
-                "\"json_root\" = \"\"\n" +
+                "\"num_as_string\" = \"false\",\n" +
+                "\"fuzzy_parse\" = \"false\",\n" +
+                "\"strict_mode\" = \"false\",\n" +
+                "\"timezone\" = \"Asia/Shanghai\",\n" +
+                "\"exec_mem_limit\" = \"2147483648\"\n" +
                 ")\n" +
                 "FROM KAFKA\n" +
                 "(\n" +
                 "\"kafka_broker_list\" = \"localhost:9092\",\n" +
                 "\"kafka_topic\" = \"test_topic\"\n" +
                 ");";
+        System.out.println(showCreateInfo);
         Assert.assertEquals(expect, showCreateInfo);
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org