You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/03 03:58:52 UTC
[incubator-doris] branch master updated: [RoutineLoad] Support
alter broker list and topic for kafka routine load (#6335)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 748604f [RoutineLoad] Support alter broker list and topic for kafka routine load (#6335)
748604f is described below
commit 748604ff4f3d8693d1f91cdb67f6637491fde5b8
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Tue Aug 3 11:58:38 2021 +0800
[RoutineLoad] Support alter broker list and topic for kafka routine load (#6335)
```
alter routine load for cmy2 from kafka("kafka_broker_list" = "ip2:9094", "kafka_topic" = "my_topic");
```
This is useful when the kafka broker list or topic has been changed.
Also modify `show create routine load`, support showing "kafka_partitions" and "kafka_offsets".
---
.../Data Manipulation/SHOW CREATE ROUTINE LOAD.md | 7 +++--
.../Data Manipulation/alter-routine-load.md | 4 ++-
.../Data Manipulation/SHOW CREATE ROUTINE LOAD.md | 6 ++--
.../Data Manipulation/alter-routine-load.md | 4 ++-
.../analysis/RoutineLoadDataSourceProperties.java | 2 ++
.../java/org/apache/doris/analysis/Separator.java | 4 +++
.../doris/analysis/ShowCreateRoutineLoadStmt.java | 6 ++--
.../doris/load/routineload/KafkaProgress.java | 27 ++++++++++++++++--
.../load/routineload/KafkaRoutineLoadJob.java | 18 ++++++++----
.../doris/load/routineload/RoutineLoadJob.java | 32 ++++++++++++++++------
.../load/routineload/RoutineLoadProgress.java | 2 +-
.../doris/analysis/AlterRoutineLoadStmtTest.java | 4 +--
.../RoutineLoadDataSourcePropertiesTest.java | 4 +--
.../doris/load/routineload/RoutineLoadJobTest.java | 20 ++++++++------
14 files changed, 101 insertions(+), 39 deletions(-)
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md
index ce783c5..499b82c 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md
@@ -26,7 +26,10 @@ under the License.
# SHOW CREATE ROUTINE LOAD
## description
- The statement is used to show the routine load job creation statement of user-defined
+ The statement is used to show the routine load job creation statement of user-defined.
+
+ The kafka partition and offset in the result show the currently consumed partition and the corresponding offset to be consumed.
+
grammar:
SHOW [ALL] CREATE ROUTINE LOAD for load_name;
@@ -39,4 +42,4 @@ under the License.
SHOW CREATE ROUTINE LOAD for test_load
## keyword
- SHOW,CREATE,ROUTINE,LOAD
\ No newline at end of file
+ SHOW,CREATE,ROUTINE,LOAD
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md b/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md
index 41b5e0e..bc22697 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md
@@ -72,7 +72,9 @@ Syntax:
1. `kafka_partitions`
2. `kafka_offsets`
- 3. Custom property, such as `property.group.id`
+ 3. `kafka_broker_list`
+ 4. `kafka_topic`
+ 5. Custom property, such as `property.group.id`
Notice:
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md
index 922db38..4809962 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW CREATE ROUTINE LOAD.md
@@ -26,7 +26,9 @@ under the License.
# SHOW CREATE ROUTINE LOAD
## description
- 该语句用于展示例行导入作业的创建语句
+ 该语句用于展示例行导入作业的创建语句。
+ 结果中的 kafka partition 和 offset 展示的当前消费的 partition,以及对应的待消费的 offset。
+
语法:
SHOW [ALL] CREATE ROUTINE LOAD for load_name;
@@ -39,4 +41,4 @@ under the License.
SHOW CREATE ROUTINE LOAD for test_load
## keyword
- SHOW,CREATE,ROUTINE,LOAD
\ No newline at end of file
+ SHOW,CREATE,ROUTINE,LOAD
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md
index 7541136..52544a5 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/alter-routine-load.md
@@ -76,7 +76,9 @@ under the License.
1. `kafka_partitions`
2. `kafka_offsets`
- 3. 自定义 property,如 `property.group.id`
+ 3. `kafka_broker_list`
+ 4. `kafka_topic`
+ 5. 自定义 property,如 `property.group.id`
注:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
index 1a10d66..41d3a7b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java
@@ -51,6 +51,8 @@ public class RoutineLoadDataSourceProperties {
.build();
private static final ImmutableSet<String> CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET = new ImmutableSet.Builder<String>()
+ .add(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY)
+ .add(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java
index 11404b5..217c06f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Separator.java
@@ -34,6 +34,10 @@ public class Separator implements ParseNode {
this.separator = null;
}
+ public String getOriSeparator() {
+ return oriSeparator;
+ }
+
public String getSeparator() {
return separator;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java
index d0ae92b..7450c08 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCreateRoutineLoadStmt.java
@@ -27,9 +27,9 @@ public class ShowCreateRoutineLoadStmt extends ShowStmt {
private static final ShowResultSetMetaData META_DATA =
ShowResultSetMetaData.builder()
- .addColumn(new Column("Routine Load Id", ScalarType.createVarchar(20)))
- .addColumn(new Column("Routine Load Name", ScalarType.createVarchar(20)))
- .addColumn(new Column("Create Routine Load", ScalarType.createVarchar(30)))
+ .addColumn(new Column("JobId", ScalarType.createVarchar(128)))
+ .addColumn(new Column("JobName", ScalarType.createVarchar(128)))
+ .addColumn(new Column("CreateStmt", ScalarType.createVarchar(65535)))
.build();
private final LabelName labelName;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
index 6d704d9..6c88b5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java
@@ -22,13 +22,14 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.thrift.TKafkaRLTaskProgress;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -128,6 +129,26 @@ public class KafkaProgress extends RoutineLoadProgress {
}
}
+ public List<Pair<Integer, String>> getPartitionOffsetPairs(boolean alreadyConsumed) {
+ List<Pair<Integer, String>> pairs = Lists.newArrayList();
+ for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
+ if (entry.getValue() == 0) {
+ pairs.add(Pair.create(entry.getKey(), OFFSET_ZERO));
+ } else if (entry.getValue() == -1) {
+ pairs.add(Pair.create(entry.getKey(), OFFSET_END));
+ } else if (entry.getValue() == -2) {
+ pairs.add(Pair.create(entry.getKey(), OFFSET_BEGINNING));
+ } else {
+ long offset = entry.getValue();
+ if (alreadyConsumed) {
+ offset -= 1;
+ }
+ pairs.add(Pair.create(entry.getKey(), "" + offset));
+ }
+ }
+ return pairs;
+ }
+
@Override
public String toString() {
Map<Integer, String> showPartitionIdToOffset = Maps.newHashMap();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 0496173..ead99ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -55,15 +55,16 @@ import com.google.gson.GsonBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.parquet.Strings;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
import java.util.TimeZone;
import java.util.UUID;
@@ -554,7 +555,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
throw new DdlException("Only supports modification of PAUSED jobs");
}
-
modifyPropertiesInternal(jobProperties, dataSourceProperties);
AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id,
@@ -593,15 +593,23 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
}
+ if (!customKafkaProperties.isEmpty()) {
+ this.customProperties.putAll(customKafkaProperties);
+ convertCustomProperties(true);
+ }
+
if (!jobProperties.isEmpty()) {
Map<String, String> copiedJobProperties = Maps.newHashMap(jobProperties);
modifyCommonJobProperties(copiedJobProperties);
this.jobProperties.putAll(copiedJobProperties);
}
- if (!customKafkaProperties.isEmpty()) {
- this.customProperties.putAll(customKafkaProperties);
- convertCustomProperties(true);
+ // modify broker list and topic
+ if (!Strings.isNullOrEmpty(dataSourceProperties.getKafkaBrokerList())) {
+ this.brokerList = dataSourceProperties.getKafkaBrokerList();
+ }
+ if (!Strings.isNullOrEmpty(dataSourceProperties.getKafkaTopic())) {
+ this.topic = dataSourceProperties.getKafkaTopic();
}
LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index a9b7083..a758d65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -39,6 +39,7 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
@@ -1325,10 +1326,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
// 4.load_properties
// 4.1.column_separator
if (columnSeparator != null) {
- sb.append("COLUMNS TERMINATED BY \"").append(columnSeparator.getSeparator()).append("\",\n");
+ sb.append("COLUMNS TERMINATED BY \"").append(columnSeparator.getOriSeparator()).append("\",\n");
}
// 4.2.columns_mapping
- if (columnDescs != null) {
+ if (columnDescs != null && !columnDescs.descs.isEmpty()) {
sb.append("COLUMNS(").append(Joiner.on(",").join(columnDescs.descs)).append("),\n");
}
// 4.3.where_predicates
@@ -1352,22 +1353,25 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
sb.append("PRECEDING FILTER ").append(precedingFilter.toSql()).append(",\n");
}
// remove the last ,
- if (",".equals(sb.charAt(sb.length() - 2))) {
+ if (sb.charAt(sb.length() - 2) == ',') {
sb.replace(sb.length() - 2, sb.length() - 1, "");
}
- // 5.job_properties
+ // 5.job_properties. See PROPERTIES_SET of CreateRoutineLoadStmt
sb.append("PROPERTIES\n(\n");
appendProperties(sb, CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, desireTaskConcurrentNum, false);
+ appendProperties(sb, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, maxErrorNum, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, maxBatchRows, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, maxBatchSizeBytes, false);
- appendProperties(sb, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, maxErrorNum, false);
- appendProperties(sb, LoadStmt.STRICT_MODE, isStrictMode(), false);
- appendProperties(sb, LoadStmt.TIMEZONE, getTimezone(), false);
appendProperties(sb, PROPS_FORMAT, getFormat(), false);
appendProperties(sb, PROPS_JSONPATHS, getJsonPaths(), false);
appendProperties(sb, PROPS_STRIP_OUTER_ARRAY, isStripOuterArray(), false);
+ appendProperties(sb, PROPS_NUM_AS_STRING, isNumAsString(), false);
+ appendProperties(sb, PROPS_FUZZY_PARSE, isFuzzyParse(), false);
appendProperties(sb, PROPS_JSONROOT, getJsonRoot(), true);
+ appendProperties(sb, LoadStmt.STRICT_MODE, isStrictMode(), false);
+ appendProperties(sb, LoadStmt.TIMEZONE, getTimezone(), false);
+ appendProperties(sb, LoadStmt.EXEC_MEM_LIMIT, getMemLimit(), true);
sb.append(")\n");
// 6. data_source
sb.append("FROM ").append(dataSourceType).append("\n");
@@ -1375,13 +1379,25 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
sb.append("(\n");
getDataSourceProperties().forEach((k, v) -> appendProperties(sb, k, v, false));
getCustomProperties().forEach((k, v) -> appendProperties(sb, k, v, false));
- // remove the last ,
+ if (progress instanceof KafkaProgress) {
+ // append partitions and offsets.
+ // the offsets is the next offset to be consumed.
+ List<Pair<Integer, String>> pairs = ((KafkaProgress) progress).getPartitionOffsetPairs(false);
+ appendProperties(sb, CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY,
+ Joiner.on(", ").join(pairs.stream().map(p -> p.first).toArray()), false);
+ appendProperties(sb, CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY,
+ Joiner.on(", ").join(pairs.stream().map(p -> p.second).toArray()), false);
+ }
+ // remove the last ","
sb.replace(sb.length() - 2, sb.length() - 1, "");
sb.append(");");
return sb.toString();
}
private static void appendProperties(StringBuilder sb, String key, Object value, boolean end) {
+ if (value == null || Strings.isNullOrEmpty(value.toString())) {
+ return;
+ }
sb.append("\"").append(key).append("\"").append(" = ").append("\"").append(value).append("\"");
if (!end) {
sb.append(",\n");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
index bf746a6..224dd54 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java
@@ -40,7 +40,7 @@ public abstract class RoutineLoadProgress implements Writable {
abstract void update(RLTaskTxnCommitAttachment attachment);
abstract String toJsonString();
-
+
public static RoutineLoadProgress read(DataInput in) throws IOException {
RoutineLoadProgress progress = null;
LoadDataSourceType type = LoadDataSourceType.valueOf(Text.readString(in));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java
index f9ddb688..0af88cd 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java
@@ -125,6 +125,7 @@ public class AlterRoutineLoadStmtTest {
}
}
+ // alter topic is now supported
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
@@ -138,9 +139,8 @@ public class AlterRoutineLoadStmtTest {
try {
stmt.analyze(analyzer);
- Assert.fail();
} catch (AnalysisException e) {
- Assert.assertTrue(e.getMessage().contains("kafka_topic is invalid kafka property"));
+ Assert.fail();
} catch (UserException e) {
e.printStackTrace();
Assert.fail();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java
index bcadc90..0acdf04 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java
@@ -291,7 +291,7 @@ public class RoutineLoadDataSourcePropertiesTest {
@Test
public void testAlterAbnormal() {
- // can not set KAFKA_BROKER_LIST_PROPERTY
+ // now support set KAFKA_BROKER_LIST_PROPERTY
Map<String, String> properties = Maps.newHashMap();
properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080");
properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "-1");
@@ -301,7 +301,7 @@ public class RoutineLoadDataSourcePropertiesTest {
dsProperties.analyze();
Assert.fail();
} catch (UserException e) {
- Assert.assertTrue(e.getMessage().contains("kafka_broker_list is invalid kafka property"));
+ Assert.assertTrue(e.getMessage().contains("kafka_default_offsets can only be set to OFFSET_BEGINNING, OFFSET_END or date time"));
}
// can not set datetime formatted offset and integer offset together
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
index 23aeb17..7f0676f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java
@@ -31,14 +31,14 @@ import org.apache.doris.thrift.TKafkaRLTaskProgress;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
-import org.apache.kafka.common.PartitionInfo;
-import org.junit.Assert;
-import org.junit.Test;
-
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
import java.util.List;
import java.util.Map;
@@ -328,22 +328,24 @@ public class RoutineLoadJobTest {
"PROPERTIES\n" +
"(\n" +
"\"desired_concurrent_number\" = \"0\",\n" +
+ "\"max_error_number\" = \"10\",\n" +
"\"max_batch_interval\" = \"10\",\n" +
"\"max_batch_rows\" = \"10\",\n" +
"\"max_batch_size\" = \"104857600\",\n" +
- "\"max_error_number\" = \"10\",\n" +
- "\"strict_mode\" = \"false\",\n" +
- "\"timezone\" = \"Asia/Shanghai\",\n" +
"\"format\" = \"csv\",\n" +
- "\"jsonpaths\" = \"\",\n" +
"\"strip_outer_array\" = \"false\",\n" +
- "\"json_root\" = \"\"\n" +
+ "\"num_as_string\" = \"false\",\n" +
+ "\"fuzzy_parse\" = \"false\",\n" +
+ "\"strict_mode\" = \"false\",\n" +
+ "\"timezone\" = \"Asia/Shanghai\",\n" +
+ "\"exec_mem_limit\" = \"2147483648\"\n" +
")\n" +
"FROM KAFKA\n" +
"(\n" +
"\"kafka_broker_list\" = \"localhost:9092\",\n" +
"\"kafka_topic\" = \"test_topic\"\n" +
");";
+ System.out.println(showCreateInfo);
Assert.assertEquals(expect, showCreateInfo);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org