You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:42 UTC
[rocketmq-flink] 29/33: [#715] Support the RocketMQ TableSource
based on the legacy Source implementation (#779)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 4edc4daf5e7f6dca981744a6f3f963103916d397
Author: SteNicholas <pr...@163.com>
AuthorDate: Wed Aug 18 15:03:04 2021 +0800
[#715] Support the RocketMQ TableSource based on the legacy Source implementation (#779)
---
pom.xml | 4 +-
.../rocketmq/flink/legacy/RocketMQConfig.java | 22 +-
.../apache/rocketmq/flink/legacy/RocketMQSink.java | 22 +-
...etMQSource.java => RocketMQSourceFunction.java} | 28 +-
.../rocketmq/flink/legacy/RunningChecker.java | 22 +-
.../common/selector/DefaultTopicSelector.java | 22 +-
.../common/selector/SimpleTopicSelector.java | 22 +-
.../legacy/common/selector/TopicSelector.java | 22 +-
.../KeyValueDeserializationSchema.java | 22 +-
.../serialization/KeyValueSerializationSchema.java | 22 +-
.../RowKeyValueDeserializationSchema.java | 407 +++++++++++++++++++++
.../SimpleKeyValueDeserializationSchema.java | 22 +-
.../SimpleKeyValueSerializationSchema.java | 22 +-
.../flink/legacy/common/util/MetricUtils.java | 22 +-
.../flink/legacy/common/util/RetryUtil.java | 22 +-
.../flink/legacy/common/util/RocketMQUtils.java | 22 +-
.../flink/legacy/common/util/TestUtils.java | 22 +-
.../flink/legacy/example/RocketMQFlinkExample.java | 5 +-
.../flink/source/common/RocketMQOptions.java | 3 +
.../deserializer/RowDeserializationSchema.java | 2 +-
.../table/RocketMQDynamicTableSourceFactory.java | 6 +-
.../source/table/RocketMQScanTableSource.java | 65 +++-
.../rocketmq/flink/legacy/RocketMQSinkTest.java | 24 +-
.../rocketmq/flink/legacy/RocketMQSourceTest.java | 28 +-
.../common/selector/DefaultTopicSelectorTest.java | 24 +-
.../common/selector/SimpleTopicSelectorTest.java | 24 +-
.../RowKeyValueDeserializationSchemaTest.java | 50 +++
.../SimpleKeyValueSerializationSchemaTest.java | 24 +-
.../RocketMQDynamicTableSourceFactoryTest.java | 111 ++++++
29 files changed, 887 insertions(+), 226 deletions(-)
diff --git a/pom.xml b/pom.xml
index abec905..0675bb7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<rocketmq.version>4.7.1</rocketmq.version>
- <flink.version>1.13.0</flink.version>
+ <flink.version>1.13.1</flink.version>
<commons-lang.version>2.5</commons-lang.version>
<scala.binary.version>2.11</scala.binary.version>
<spotless.version>2.4.2</spotless.version>
@@ -45,11 +45,13 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
index 5c19b7a..fc257a1 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQConfig.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
index f91a684..b6e1793 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSink.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
similarity index 96%
rename from src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java
rename to src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 84260b6..8821a6d 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy;
@@ -82,12 +84,12 @@ import static org.apache.rocketmq.flink.legacy.common.util.RocketMQUtils.getLong
* guarantees when checkpoints are enabled. Otherwise, the source doesn't provide any reliability
* guarantees.
*/
-public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
+public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT>
implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
private static final long serialVersionUID = 1L;
- private static final Logger log = LoggerFactory.getLogger(RocketMQSource.class);
+ private static final Logger log = LoggerFactory.getLogger(RocketMQSourceFunction.class);
private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
private RunningChecker runningChecker;
private transient DefaultMQPullConsumer consumer;
@@ -115,7 +117,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
private Meter tpsMetric;
- public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
+ public RocketMQSourceFunction(KeyValueDeserializationSchema<OUT> schema, Properties props) {
this.schema = schema;
this.props = props;
}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java b/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java
index c48361a..f36b727 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RunningChecker.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java
index 6be5218..128b19e 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelector.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy.common.selector;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java
index 674b5a0..dcdaa2f 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelector.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy.common.selector;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java
index 581dadc..a70c599 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/selector/TopicSelector.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy.common.selector;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java
index 4cc8c61..8d0c778 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueDeserializationSchema.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy.common.serialization;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java
index 66b2e29..0000772 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/KeyValueSerializationSchema.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy.common.serialization;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
new file mode 100644
index 0000000..bc43b1c
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchema.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.rocketmq.flink.source.reader.deserializer.DirtyDataStrategy;
+import org.apache.rocketmq.flink.source.util.ByteSerializer;
+import org.apache.rocketmq.flink.source.util.StringSerializer;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * * The row based implementation of {@link KeyValueDeserializationSchema} for the deserialization
+ * of message key and value..
+ */
+public class RowKeyValueDeserializationSchema implements KeyValueDeserializationSchema<RowData> {
+
+ private static final long serialVersionUID = -1L;
+ private static final Logger logger =
+ LoggerFactory.getLogger(RowKeyValueDeserializationSchema.class);
+
+ private transient TableSchema tableSchema;
+ private final DirtyDataStrategy formatErrorStrategy;
+ private final DirtyDataStrategy fieldMissingStrategy;
+ private final DirtyDataStrategy fieldIncrementStrategy;
+ private final String encoding;
+ private final String fieldDelimiter;
+ private final boolean columnErrorDebug;
+ private final int columnSize;
+ private final ByteSerializer.ValueType[] fieldTypes;
+ private final transient DataType[] fieldDataTypes;
+ private final Map<String, Integer> columnIndexMapping;
+ private long lastLogExceptionTime;
+ private long lastLogHandleFieldTime;
+
+ private static final int DEFAULT_LOG_INTERVAL_MS = 60 * 1000;
+
+ public RowKeyValueDeserializationSchema(
+ TableSchema tableSchema,
+ DirtyDataStrategy formatErrorStrategy,
+ DirtyDataStrategy fieldMissingStrategy,
+ DirtyDataStrategy fieldIncrementStrategy,
+ String encoding,
+ String fieldDelimiter,
+ boolean columnErrorDebug,
+ Map<String, String> properties) {
+ this.tableSchema = tableSchema;
+ this.formatErrorStrategy = formatErrorStrategy;
+ this.fieldMissingStrategy = fieldMissingStrategy;
+ this.fieldIncrementStrategy = fieldIncrementStrategy;
+ this.columnErrorDebug = columnErrorDebug;
+ this.encoding = encoding;
+ this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter);
+ this.columnSize = tableSchema.getFieldNames().length;
+ this.fieldTypes = new ByteSerializer.ValueType[columnSize];
+ this.columnIndexMapping = new HashMap<>();
+ for (int index = 0; index < columnSize; index++) {
+ this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
+ }
+ for (int index = 0; index < columnSize; index++) {
+ ByteSerializer.ValueType type =
+ ByteSerializer.getTypeIndex(tableSchema.getFieldTypes()[index].getTypeClass());
+ this.fieldTypes[index] = type;
+ }
+
+ DescriptorProperties descriptorProperties = new DescriptorProperties();
+ descriptorProperties.putProperties(properties);
+ this.fieldDataTypes = tableSchema.getFieldDataTypes();
+ this.lastLogExceptionTime = System.currentTimeMillis();
+ this.lastLogHandleFieldTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public RowData deserializeKeyAndValue(byte[] key, byte[] value) {
+ if (isOnlyHaveVarbinaryDataField()) {
+ GenericRowData rowData = new GenericRowData(columnSize);
+ rowData.setField(0, value);
+ return rowData;
+ } else {
+ if (value == null) {
+ logger.info("Deserialize empty BytesMessage body, ignore the empty message.");
+ return null;
+ }
+ return deserializeValue(value);
+ }
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return InternalTypeInfo.of((RowType) tableSchema.toRowDataType().getLogicalType());
+ }
+
+ private boolean isOnlyHaveVarbinaryDataField() {
+ if (columnSize == 1) {
+ return isByteArrayType(tableSchema.getFieldNames()[0]);
+ }
+ return false;
+ }
+
+ private RowData deserializeValue(byte[] value) {
+ String body;
+ try {
+ body = new String(value, encoding);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ String[] data = StringUtils.splitPreserveAllTokens(body, fieldDelimiter);
+ if (columnSize == 1) {
+ data = new String[1];
+ data[0] = body;
+ }
+ if (data.length < columnSize) {
+ data = handleFieldMissing(data);
+ } else if (data.length > columnSize) {
+ data = handleFieldIncrement(data);
+ }
+ if (data == null) {
+ return null;
+ }
+ GenericRowData rowData = new GenericRowData(columnSize);
+ boolean skip = false;
+ for (int index = 0; index < columnSize; index++) {
+ try {
+ String fieldValue = getValue(data, body, index);
+ rowData.setField(
+ index,
+ StringSerializer.deserialize(
+ fieldValue,
+ fieldTypes[index],
+ fieldDataTypes[index],
+ new HashSet<>()));
+ } catch (Exception e) {
+ skip = handleException(rowData, index, data, e);
+ }
+ }
+ if (skip) {
+ return null;
+ }
+ return rowData;
+ }
+
+ private String getValue(String[] data, String line, int index) {
+ String fieldValue = null;
+ if (columnSize == 1) {
+ fieldValue = line;
+ } else {
+ if (index < data.length) {
+ fieldValue = data[index];
+ }
+ }
+ return fieldValue;
+ }
+
+ private boolean isByteArrayType(String fieldName) {
+ TypeInformation<?> typeInformation =
+ tableSchema.getFieldTypes()[columnIndexMapping.get(fieldName)];
+ if (typeInformation != null) {
+ ByteSerializer.ValueType valueType =
+ ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
+ return valueType == ByteSerializer.ValueType.V_ByteArray;
+ }
+ return false;
+ }
+
+ private boolean handleException(GenericRowData row, int index, Object[] data, Exception e) {
+ boolean skip = false;
+ switch (formatErrorStrategy) {
+ case SKIP:
+ long now = System.currentTimeMillis();
+ if (columnErrorDebug || now - lastLogExceptionTime > DEFAULT_LOG_INTERVAL_MS) {
+ logger.warn(
+ "Data format error, field type: "
+ + fieldTypes[index]
+ + "field data: "
+ + data[index]
+ + ", index: "
+ + index
+ + ", data: ["
+ + StringUtils.join(data, ",")
+ + "]",
+ e);
+ lastLogExceptionTime = now;
+ }
+ skip = true;
+ break;
+ case SKIP_SILENT:
+ skip = true;
+ break;
+ default:
+ case CUT:
+ case NULL:
+ case PAD:
+ row.setField(index, null);
+ break;
+ case EXCEPTION:
+ throw new RuntimeException(e);
+ }
+
+ return skip;
+ }
+
+ private String[] handleFieldMissing(String[] data) {
+ String fieldMissingMessage =
+ String.format(
+ "Field missing exception, table column number: %d, data column number: %d, data field number: %d, data: [%s].",
+ columnSize, columnSize, data.length, StringUtils.join(data, ","));
+ switch (fieldMissingStrategy) {
+ default:
+ case SKIP:
+ long now = System.currentTimeMillis();
+ if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+ logger.warn(fieldMissingMessage);
+ lastLogHandleFieldTime = now;
+ }
+ return null;
+ case SKIP_SILENT:
+ return null;
+ case CUT:
+ case NULL:
+ case PAD:
+ return data;
+ case EXCEPTION:
+ logger.error(fieldMissingMessage);
+ throw new RuntimeException(fieldMissingMessage);
+ }
+ }
+
+ private String[] handleFieldIncrement(String[] data) {
+ String fieldIncrementMessage =
+ String.format(
+ "Field increment exception, table column number: %d, data column number: %d, data field number: %d, data: [%s].",
+ columnSize, columnSize, data.length, StringUtils.join(data, ","));
+ switch (fieldIncrementStrategy) {
+ case SKIP:
+ long now = System.currentTimeMillis();
+ if (columnErrorDebug || now - lastLogHandleFieldTime > DEFAULT_LOG_INTERVAL_MS) {
+ logger.warn(fieldIncrementMessage);
+ lastLogHandleFieldTime = now;
+ }
+ return null;
+ case SKIP_SILENT:
+ return null;
+ default:
+ case CUT:
+ case NULL:
+ case PAD:
+ return data;
+ case EXCEPTION:
+ logger.error(fieldIncrementMessage);
+ throw new RuntimeException(fieldIncrementMessage);
+ }
+ }
+
+ /** Builder of {@link RowKeyValueDeserializationSchema}. */
+ public static class Builder {
+
+ private TableSchema schema;
+ private DirtyDataStrategy formatErrorStrategy = DirtyDataStrategy.SKIP;
+ private DirtyDataStrategy fieldMissingStrategy = DirtyDataStrategy.SKIP;
+ private DirtyDataStrategy fieldIncrementStrategy = DirtyDataStrategy.CUT;
+ private String encoding = "UTF-8";
+ private String fieldDelimiter = "\u0001";
+ private boolean columnErrorDebug = false;
+ private Map<String, String> properties;
+
+ public Builder() {}
+
+ public Builder setTableSchema(TableSchema tableSchema) {
+ this.schema = tableSchema;
+ return this;
+ }
+
+ public Builder setFormatErrorStrategy(DirtyDataStrategy formatErrorStrategy) {
+ this.formatErrorStrategy = formatErrorStrategy;
+ return this;
+ }
+
+ public Builder setFieldMissingStrategy(DirtyDataStrategy fieldMissingStrategy) {
+ this.fieldMissingStrategy = fieldMissingStrategy;
+ return this;
+ }
+
+ public Builder setFieldIncrementStrategy(DirtyDataStrategy fieldIncrementStrategy) {
+ this.fieldIncrementStrategy = fieldIncrementStrategy;
+ return this;
+ }
+
+ public Builder setEncoding(String encoding) {
+ this.encoding = encoding;
+ return this;
+ }
+
+ public Builder setFieldDelimiter(String fieldDelimiter) {
+ this.fieldDelimiter = fieldDelimiter;
+ return this;
+ }
+
+ public Builder setColumnErrorDebug(boolean columnErrorDebug) {
+ this.columnErrorDebug = columnErrorDebug;
+ return this;
+ }
+
+ public Builder setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ if (null == properties) {
+ return this;
+ }
+ Configuration configuration = new Configuration();
+ for (String key : properties.keySet()) {
+ configuration.setString(key, properties.get(key));
+ }
+ String lengthCheck = configuration.get(CollectorOption.LENGTH_CHECK);
+ switch (lengthCheck.toUpperCase()) {
+ case "SKIP":
+ {
+ this.setFormatErrorStrategy(DirtyDataStrategy.SKIP);
+ this.setFieldMissingStrategy(DirtyDataStrategy.SKIP);
+ this.setFieldIncrementStrategy(DirtyDataStrategy.SKIP);
+ }
+ break;
+ case "PAD":
+ {
+ this.setFormatErrorStrategy(DirtyDataStrategy.SKIP);
+ this.setFieldMissingStrategy(DirtyDataStrategy.PAD);
+ this.setFieldIncrementStrategy(DirtyDataStrategy.CUT);
+ }
+ break;
+ case "EXCEPTION":
+ {
+ this.setFormatErrorStrategy(DirtyDataStrategy.EXCEPTION);
+ this.setFieldMissingStrategy(DirtyDataStrategy.EXCEPTION);
+ this.setFieldIncrementStrategy(DirtyDataStrategy.EXCEPTION);
+ }
+ break;
+ case "SKIP_SILENT":
+ {
+ this.setFormatErrorStrategy(DirtyDataStrategy.SKIP_SILENT);
+ this.setFieldMissingStrategy(DirtyDataStrategy.SKIP_SILENT);
+ this.setFieldIncrementStrategy(DirtyDataStrategy.SKIP_SILENT);
+ }
+ break;
+ default:
+ }
+ this.setEncoding(configuration.getString(CollectorOption.ENCODING));
+ this.setFieldDelimiter(configuration.getString(CollectorOption.FIELD_DELIMITER));
+ this.setColumnErrorDebug(configuration.getBoolean(CollectorOption.COLUMN_ERROR_DEBUG));
+ return this;
+ }
+
+ public RowKeyValueDeserializationSchema build() {
+ return new RowKeyValueDeserializationSchema(
+ schema,
+ formatErrorStrategy,
+ fieldMissingStrategy,
+ fieldIncrementStrategy,
+ encoding,
+ fieldDelimiter,
+ columnErrorDebug,
+ properties);
+ }
+ }
+
+ /** Options for {@link RowKeyValueDeserializationSchema}. */
+ public static class CollectorOption {
+ public static final ConfigOption<String> ENCODING =
+ ConfigOptions.key("encoding".toLowerCase()).defaultValue("UTF-8");
+ public static final ConfigOption<String> FIELD_DELIMITER =
+ ConfigOptions.key("fieldDelimiter".toLowerCase()).defaultValue("\u0001");
+ public static final ConfigOption<Boolean> COLUMN_ERROR_DEBUG =
+ ConfigOptions.key("columnErrorDebug".toLowerCase()).defaultValue(true);
+ public static final ConfigOption<String> LENGTH_CHECK =
+ ConfigOptions.key("lengthCheck".toLowerCase()).defaultValue("NONE");
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
index 7dada93..456f477 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy.common.serialization;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java
index 3e92ad2..c3ae600 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchema.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy.common.serialization;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
index bb3baeb..a54a29a 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/MetricUtils.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy.common.util;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
index 7ec1dca..e53caf1 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RetryUtil.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy.common.util;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java
index 94a24a1..1f084a8 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/RocketMQUtils.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy.common.util;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
index 407aec7..70c26d9 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/util/TestUtils.java
@@ -1,15 +1,17 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.flink.legacy.common.util;
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java b/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java
index b435726..fc0d3cb 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/example/RocketMQFlinkExample.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.flink.legacy.example;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.legacy.RocketMQSink;
-import org.apache.rocketmq.flink.legacy.RocketMQSource;
+import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
import org.apache.rocketmq.flink.legacy.common.serialization.SimpleTupleDeserializationSchema;
import org.apache.rocketmq.flink.legacy.function.SinkMapFunction;
import org.apache.rocketmq.flink.legacy.function.SourceMapFunction;
@@ -114,7 +114,8 @@ public class RocketMQFlinkExample {
SimpleTupleDeserializationSchema schema = new SimpleTupleDeserializationSchema();
DataStreamSource<Tuple2<String, String>> source =
- env.addSource(new RocketMQSource<>(schema, consumerProps)).setParallelism(2);
+ env.addSource(new RocketMQSourceFunction<>(schema, consumerProps))
+ .setParallelism(2);
source.print();
source.process(new SourceMapFunction())
diff --git a/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java b/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java
index 064e193..000e090 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/common/RocketMQOptions.java
@@ -53,6 +53,9 @@ public class RocketMQOptions {
public static final ConfigOption<Long> OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS =
ConfigOptions.key("partitionDiscoveryIntervalMs").longType().defaultValue(30000L);
+ public static final ConfigOption<Boolean> OPTIONAL_USE_NEW_API =
+ ConfigOptions.key("useNewApi").booleanType().defaultValue(true);
+
public static final ConfigOption<String> OPTIONAL_ENCODING =
ConfigOptions.key("encoding").stringType().defaultValue("UTF-8");
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
index f106693..b946016 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/deserializer/RowDeserializationSchema.java
@@ -109,7 +109,7 @@ public class RowDeserializationSchema
this.fieldTypes = new ValueType[totalColumnSize];
this.columnIndexMapping = new HashMap<>();
this.dataIndexMapping = new HashMap<>();
- for (int index = 0; index < tableSchema.getFieldNames().length; index++) {
+ for (int index = 0; index < totalColumnSize; index++) {
this.columnIndexMapping.put(tableSchema.getFieldNames()[index], index);
}
for (int index = 0; index < totalColumnSize; index++) {
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
index ec41fc6..990e28b 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
@@ -56,6 +56,7 @@ import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_S
import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_START_TIME_MILLS;
import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TAG;
import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_TIME_ZONE;
+import static org.apache.rocketmq.flink.source.common.RocketMQOptions.OPTIONAL_USE_NEW_API;
import static org.apache.rocketmq.flink.source.common.RocketMQOptions.TOPIC;
/**
@@ -90,6 +91,7 @@ public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFact
optionalOptions.add(OPTIONAL_END_TIME);
optionalOptions.add(OPTIONAL_TIME_ZONE);
optionalOptions.add(OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS);
+ optionalOptions.add(OPTIONAL_USE_NEW_API);
optionalOptions.add(OPTIONAL_ENCODING);
optionalOptions.add(OPTIONAL_FIELD_DELIMITER);
optionalOptions.add(OPTIONAL_LINE_DELIMITER);
@@ -146,6 +148,7 @@ public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFact
}
long partitionDiscoveryIntervalMs =
configuration.getLong(OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS);
+ boolean useNewApi = configuration.getBoolean(OPTIONAL_USE_NEW_API);
DescriptorProperties descriptorProperties = new DescriptorProperties();
descriptorProperties.putProperties(rawProperties);
TableSchema physicalSchema =
@@ -161,7 +164,8 @@ public class RocketMQDynamicTableSourceFactory implements DynamicTableSourceFact
stopInMs,
startMessageOffset,
startMessageOffset < 0 ? startTime : -1L,
- partitionDiscoveryIntervalMs);
+ partitionDiscoveryIntervalMs,
+ useNewApi);
}
private void transformContext(
diff --git a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
index 37ab6a5..2a0d28a 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
@@ -17,6 +17,10 @@
package org.apache.rocketmq.flink.source.table;
+import org.apache.rocketmq.flink.legacy.RocketMQConfig;
+import org.apache.rocketmq.flink.legacy.RocketMQSourceFunction;
+import org.apache.rocketmq.flink.legacy.common.serialization.KeyValueDeserializationSchema;
+import org.apache.rocketmq.flink.legacy.common.serialization.RowKeyValueDeserializationSchema;
import org.apache.rocketmq.flink.source.RocketMQSource;
import org.apache.rocketmq.flink.source.reader.deserializer.BytesMessage;
import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema;
@@ -28,6 +32,7 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
@@ -39,6 +44,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.stream.Stream;
import static org.apache.flink.api.connector.source.Boundedness.BOUNDED;
@@ -59,6 +65,7 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
private final long partitionDiscoveryIntervalMs;
private final long startMessageOffset;
private final long startTime;
+ private final boolean useNewApi;
private List<String> metadataKeys;
@@ -72,7 +79,8 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
long stopInMs,
long startMessageOffset,
long startTime,
- long partitionDiscoveryIntervalMs) {
+ long partitionDiscoveryIntervalMs,
+ boolean useNewApi) {
this.properties = properties;
this.schema = schema;
this.topic = topic;
@@ -83,6 +91,7 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
this.startMessageOffset = startMessageOffset;
this.startTime = startTime;
this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
+ this.useNewApi = useNewApi;
this.metadataKeys = Collections.emptyList();
}
@@ -93,18 +102,25 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
- return SourceProvider.of(
- new RocketMQSource<>(
- topic,
- consumerGroup,
- nameServerAddress,
- tag,
- stopInMs,
- startTime,
- startMessageOffset < 0 ? 0 : startMessageOffset,
- partitionDiscoveryIntervalMs,
- isBounded() ? BOUNDED : CONTINUOUS_UNBOUNDED,
- createDeserializationSchema()));
+ if (useNewApi) {
+ return SourceProvider.of(
+ new RocketMQSource<>(
+ topic,
+ consumerGroup,
+ nameServerAddress,
+ tag,
+ stopInMs,
+ startTime,
+ startMessageOffset < 0 ? 0 : startMessageOffset,
+ partitionDiscoveryIntervalMs,
+ isBounded() ? BOUNDED : CONTINUOUS_UNBOUNDED,
+ createRocketMQDeserializationSchema()));
+ } else {
+ return SourceFunctionProvider.of(
+ new RocketMQSourceFunction<>(
+ createKeyValueDeserializationSchema(), getConsumerProps()),
+ isBounded());
+ }
}
@Override
@@ -133,17 +149,18 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
stopInMs,
startMessageOffset,
startTime,
- partitionDiscoveryIntervalMs);
+ partitionDiscoveryIntervalMs,
+ useNewApi);
tableSource.metadataKeys = metadataKeys;
return tableSource;
}
@Override
public String asSummaryString() {
- return "RocketMQScanTableSource";
+ return RocketMQScanTableSource.class.getName();
}
- private RocketMQDeserializationSchema<RowData> createDeserializationSchema() {
+ private RocketMQDeserializationSchema<RowData> createRocketMQDeserializationSchema() {
final MetadataConverter[] metadataConverters =
metadataKeys.stream()
.map(
@@ -162,6 +179,22 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
return stopInMs != Long.MAX_VALUE;
}
+ private KeyValueDeserializationSchema<RowData> createKeyValueDeserializationSchema() {
+ return new RowKeyValueDeserializationSchema.Builder()
+ .setProperties(properties.asMap())
+ .setTableSchema(schema)
+ .build();
+ }
+
+ private Properties getConsumerProps() {
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, topic);
+ consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, consumerGroup);
+ consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServerAddress);
+ consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, tag);
+ return consumerProps;
+ }
+
// --------------------------------------------------------------------------------------------
// Metadata handling
// --------------------------------------------------------------------------------------------
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java
index c45dbdf..ad3c0b1 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSinkTest.java
@@ -1,17 +1,21 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.flink.legacy;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
index a863ddd..7ce124d 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
@@ -1,17 +1,21 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.flink.legacy;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -50,7 +54,7 @@ import static org.mockito.Mockito.when;
@Ignore
public class RocketMQSourceTest {
- private RocketMQSource rocketMQSource;
+ private RocketMQSourceFunction rocketMQSource;
private MQPullConsumerScheduleService pullConsumerScheduleService;
private DefaultMQPullConsumer consumer;
private KeyValueDeserializationSchema deserializationSchema;
@@ -60,7 +64,7 @@ public class RocketMQSourceTest {
public void setUp() throws Exception {
deserializationSchema = new SimpleKeyValueDeserializationSchema();
Properties props = new Properties();
- rocketMQSource = new RocketMQSource(deserializationSchema, props);
+ rocketMQSource = new RocketMQSourceFunction(deserializationSchema, props);
setFieldValue(rocketMQSource, "topic", topic);
setFieldValue(rocketMQSource, "runningChecker", new SingleRunningCheck());
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java
index b235c63..aa1528a 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/DefaultTopicSelectorTest.java
@@ -1,17 +1,21 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.flink.legacy.common.selector;
import org.junit.Test;
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java
index 5c0f755..dc93d14 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/selector/SimpleTopicSelectorTest.java
@@ -1,17 +1,21 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.flink.legacy.common.selector;
import org.junit.Test;
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchemaTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchemaTest.java
new file mode 100644
index 0000000..2c1786a
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/RowKeyValueDeserializationSchemaTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.legacy.common.serialization;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link RowKeyValueDeserializationSchema}. */
+public class RowKeyValueDeserializationSchemaTest {
+
+ @Test
+ public void testDeserializeKeyAndValue() {
+ TableSchema tableSchema =
+ new TableSchema.Builder().field("varchar", DataTypes.VARCHAR(100)).build();
+ RowKeyValueDeserializationSchema deserializationSchema =
+ new RowKeyValueDeserializationSchema.Builder()
+ .setTableSchema(tableSchema)
+ .setProperties(new HashMap<>())
+ .build();
+ MessageExt messageExt = new MessageExt();
+ messageExt.setBody("test_deserialize_key_and_value".getBytes());
+ RowData rowData = deserializationSchema.deserializeKeyAndValue(null, messageExt.getBody());
+ assertEquals(new String(messageExt.getBody()), rowData.getString(0).toString());
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
index 78baf20..7e2e0d9 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
@@ -1,17 +1,21 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * <p>Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.flink.legacy.common.serialization;
import org.junit.Test;
diff --git a/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java b/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java
new file mode 100644
index 0000000..184a23f
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactoryTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.source.table;
+
+import org.apache.rocketmq.flink.source.common.RocketMQOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link RocketMQDynamicTableSourceFactory}. */
+public class RocketMQDynamicTableSourceFactoryTest {
+
+ private static final ResolvedSchema SCHEMA =
+ new ResolvedSchema(
+ Collections.singletonList(Column.physical("name", STRING().notNull())),
+ new ArrayList<>(),
+ null);
+
+ private static final String IDENTIFIER = "rocketmq";
+ private static final String TOPIC = "test_source";
+ private static final String CONSUMER_GROUP = "test_consumer";
+ private static final String NAME_SERVER_ADDRESS = "127.0.0.1:9876";
+
+ @Test
+ public void testRocketMQDynamicTableSourceWithLegalOption() {
+ final Map<String, String> options = new HashMap<>();
+ options.put("connector", IDENTIFIER);
+ options.put(RocketMQOptions.TOPIC.key(), TOPIC);
+ options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP);
+ options.put(RocketMQOptions.NAME_SERVER_ADDRESS.key(), NAME_SERVER_ADDRESS);
+ final DynamicTableSource tableSource = createTableSource(options);
+ assertTrue(tableSource instanceof RocketMQScanTableSource);
+ assertEquals(RocketMQScanTableSource.class.getName(), tableSource.asSummaryString());
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testRocketMQDynamicTableSourceWithoutRequiredOption() {
+ final Map<String, String> options = new HashMap<>();
+ options.put("connector", IDENTIFIER);
+ options.put(RocketMQOptions.TOPIC.key(), TOPIC);
+ options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP);
+ options.put(RocketMQOptions.OPTIONAL_TAG.key(), "test_tag");
+ createTableSource(options);
+ }
+
+ @Test(expected = ValidationException.class)
+ public void testRocketMQDynamicTableSourceWithUnknownOption() {
+ final Map<String, String> options = new HashMap<>();
+ options.put(RocketMQOptions.TOPIC.key(), TOPIC);
+ options.put(RocketMQOptions.CONSUMER_GROUP.key(), CONSUMER_GROUP);
+ options.put(RocketMQOptions.NAME_SERVER_ADDRESS.key(), NAME_SERVER_ADDRESS);
+ options.put("unknown", "test_option");
+ createTableSource(options);
+ }
+
+ private static DynamicTableSource createTableSource(
+ Map<String, String> options, Configuration conf) {
+ return FactoryUtil.createTableSource(
+ null,
+ ObjectIdentifier.of("default", "default", IDENTIFIER),
+ new ResolvedCatalogTable(
+ CatalogTable.of(
+ Schema.newBuilder().fromResolvedSchema(SCHEMA).build(),
+ "mock source",
+ Collections.emptyList(),
+ options),
+ SCHEMA),
+ conf,
+ RocketMQDynamicTableSourceFactory.class.getClassLoader(),
+ false);
+ }
+
+ private static DynamicTableSource createTableSource(Map<String, String> options) {
+ return createTableSource(options, new Configuration());
+ }
+}