You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/08/16 07:31:54 UTC

[GitHub] [inlong] thesumery opened a new pull request, #5568: [INLONG-5564][Sort] Support read pulsar source without adminUrl

thesumery opened a new pull request, #5568:
URL: https://github.com/apache/inlong/pull/5568

   ### Prepare a Pull Request
   
   [INLONG-5564][Sort] Support read pulsar source without adminUrl
   
   - Fixes #5564
   
   ### Motivation
   
   To support TDMQ product, we introduce consume pulsar topic without giving an admin-url. Beacuse for some security restrictions,sometimes admin url prohibit external exposure.
   
   ### Modifications
   
   admin-url is not required option for pulsar but an optional option, if not give an admin-url,it will consume pulsar topic with tdmq source.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5568: [INLONG-5564][Sort] Support read pulsar source without adminUrl

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5568:
URL: https://github.com/apache/inlong/pull/5568#discussion_r948659833


##########
inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/tdmq/TDMQReaderThread.java:
##########
@@ -0,0 +1,261 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   duplication license header



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Oneal65 commented on a diff in pull request #5568: [INLONG-5564][Sort] Support read pulsar source without adminUrl

Posted by GitBox <gi...@apache.org>.
Oneal65 commented on code in PR #5568:
URL: https://github.com/apache/inlong/pull/5568#discussion_r949963816


##########
inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java:
##########
@@ -377,6 +377,7 @@ protected PulsarDynamicTableSource createPulsarTableSource(
                 properties,
                 startupOptions,
                 false,
-            inLongMetric, auditHostAndPorts);
+                inLongMetric,

Review Comment:
   to inlongMetric



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap merged pull request #5568: [INLONG-5564][Sort] Support read Pulsar source without adminUrl

Posted by GitBox <gi...@apache.org>.
EMsnap merged PR #5568:
URL: https://github.com/apache/inlong/pull/5568


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on pull request #5568: [INLONG-5564][Sort] Support read pulsar source without adminUrl

Posted by GitBox <gi...@apache.org>.
thesumery commented on PR #5568:
URL: https://github.com/apache/inlong/pull/5568#issuecomment-1220348472

   > I understand it is supported for access a pulsar source without admin api. It is recommended to weaken the concept of tdmq (explain the reason for copying the code). For example: 1.tdmq -> source (package name replacement) 2.FlinkTDMQSource -> FlinkPulsarSource 3.TDMQFetcher -> PulsarFetcher 4.TDMQMetadataReader -> PulsarMetadataReader 5.TDMQReaderThread -> PulsarReaderThread
   
   Thx for your advice. I will replace 'tdmq' package with 'withoutadmin' to represent it consume data without admin privilege.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #5568: [INLONG-5564][Sort] Support read pulsar source without adminUrl

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #5568:
URL: https://github.com/apache/inlong/pull/5568#discussion_r946431863


##########
inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/tdmq/TDMQMetadataReader.java:
##########
@@ -0,0 +1,251 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.pulsar.tdmq;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
+import org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange;
+import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
+import org.apache.flink.streaming.connectors.pulsar.internal.TopicRange;
+import org.apache.flink.util.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Range;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.shade.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.ENABLE_KEY_HASH_RANGE_KEY;
+
+/**
+ * Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9,
+ * From {@link org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader}
+ * A Helper class that talks to Pulsar Admin API.

Review Comment:
   add license 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on pull request #5568: [INLONG-5564][Sort] Support read pulsar source without adminUrl

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on PR #5568:
URL: https://github.com/apache/inlong/pull/5568#issuecomment-1220199809

   I understand it is supported for access a pulsar source without admin api. It is recommended to weaken the concept of tdmq (explain the reason for copying the code). For example:
   1. tdmq -> source (package name replacement)
   2. FlinkTDMQSource -> FlinkPulsarSource
   3.TDMQFetcher -> PulsarFetcher
   4. TDMQMetadataReader -> PulsarMetadataReader
   5.TDMQReaderThread -> PulsarReaderThread


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5568: [INLONG-5564][Sort] Support read pulsar source without adminUrl

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5568:
URL: https://github.com/apache/inlong/pull/5568#discussion_r948658654


##########
inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/tdmq/FlinkTDMQSource.java:
##########
@@ -0,0 +1,1020 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   duplicaiton license header



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5568: [INLONG-5564][Sort] Support read pulsar source without adminUrl

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5568:
URL: https://github.com/apache/inlong/pull/5568#discussion_r948658654


##########
inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/tdmq/FlinkTDMQSource.java:
##########
@@ -0,0 +1,1020 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   duplicaiton license header



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #5568: [INLONG-5564][Sort] Support read pulsar source without adminUrl

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #5568:
URL: https://github.com/apache/inlong/pull/5568#discussion_r950011190


##########
inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java:
##########
@@ -377,6 +377,7 @@ protected PulsarDynamicTableSource createPulsarTableSource(
                 properties,
                 startupOptions,
                 false,
-            inLongMetric, auditHostAndPorts);
+                inLongMetric,

Review Comment:
   format problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #5568: [INLONG-5564][Sort] Support read pulsar source without adminUrl

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #5568:
URL: https://github.com/apache/inlong/pull/5568#discussion_r948596140


##########
inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java:
##########
@@ -233,35 +234,61 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
                 valueDeserialization,
                 producedTypeInfo);
         final ClientConfigurationData clientConfigurationData = PulsarClientUtils.newClientConf(serviceUrl, properties);
-        FlinkPulsarSource<RowData> source = new FlinkPulsarSource<>(
-                adminUrl,
-                clientConfigurationData,
-                deserializationSchema,
-                properties
-        );
-
-        if (watermarkStrategy != null) {
-            source.assignTimestampsAndWatermarks(watermarkStrategy);
-        }
-
-        switch (startupOptions.startupMode) {
-            case EARLIEST:
-                source.setStartFromEarliest();
-                break;
-            case LATEST:
-                source.setStartFromLatest();
-                break;
-            case SPECIFIC_OFFSETS:
-                source.setStartFromSpecificOffsets(startupOptions.specificOffsets);
-                break;
-            case EXTERNAL_SUBSCRIPTION:
-                MessageId subscriptionPosition = MessageId.latest;
-                if (CONNECTOR_STARTUP_MODE_VALUE_EARLIEST.equals(startupOptions.externalSubStartOffset)) {
-                    subscriptionPosition = MessageId.earliest;
-                }
-                source.setStartFromSubscription(startupOptions.externalSubscriptionName, subscriptionPosition);
+        if (adminUrl != null) {
+            FlinkPulsarSource<RowData> source = new FlinkPulsarSource<>(
+                    adminUrl,
+                    clientConfigurationData,
+                    deserializationSchema,
+                    properties
+            );
+
+            if (watermarkStrategy != null) {
+                source.assignTimestampsAndWatermarks(watermarkStrategy);
+            }
+
+            switch (startupOptions.startupMode) {
+                case EARLIEST:
+                    source.setStartFromEarliest();
+                    break;
+                case LATEST:
+                    source.setStartFromLatest();
+                    break;
+                case SPECIFIC_OFFSETS:
+                    source.setStartFromSpecificOffsets(startupOptions.specificOffsets);
+                    break;
+                case EXTERNAL_SUBSCRIPTION:
+                    MessageId subscriptionPosition = MessageId.latest;
+                    if (CONNECTOR_STARTUP_MODE_VALUE_EARLIEST.equals(startupOptions.externalSubStartOffset)) {
+                        subscriptionPosition = MessageId.earliest;
+                    }
+                    source.setStartFromSubscription(startupOptions.externalSubscriptionName, subscriptionPosition);
+            }
+            return SourceFunctionProvider.of(source, false);
+        } else {
+            FlinkTDMQSource<RowData> source = new FlinkTDMQSource<>(

Review Comment:
   ok, add an option to support tmdq option



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #5568: [INLONG-5564][Sort] Support read pulsar source without adminUrl

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #5568:
URL: https://github.com/apache/inlong/pull/5568#discussion_r949977335


##########
inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java:
##########
@@ -233,35 +233,62 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
                 valueDeserialization,
                 producedTypeInfo);
         final ClientConfigurationData clientConfigurationData = PulsarClientUtils.newClientConf(serviceUrl, properties);
-        FlinkPulsarSource<RowData> source = new FlinkPulsarSource<>(
-                adminUrl,
-                clientConfigurationData,
-                deserializationSchema,
-                properties
-        );
-
-        if (watermarkStrategy != null) {
-            source.assignTimestampsAndWatermarks(watermarkStrategy);
-        }
-
-        switch (startupOptions.startupMode) {
-            case EARLIEST:
-                source.setStartFromEarliest();
-                break;
-            case LATEST:
-                source.setStartFromLatest();
-                break;
-            case SPECIFIC_OFFSETS:
-                source.setStartFromSpecificOffsets(startupOptions.specificOffsets);
-                break;
-            case EXTERNAL_SUBSCRIPTION:
-                MessageId subscriptionPosition = MessageId.latest;
-                if (CONNECTOR_STARTUP_MODE_VALUE_EARLIEST.equals(startupOptions.externalSubStartOffset)) {
-                    subscriptionPosition = MessageId.earliest;
-                }
-                source.setStartFromSubscription(startupOptions.externalSubscriptionName, subscriptionPosition);
+        if (adminUrl != null) {
+            org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource source =

Review Comment:
   method is a little bit too long, suggest to extract different method for adminUrl



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5568: [INLONG-5564][Sort] Support read pulsar source without adminUrl

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5568:
URL: https://github.com/apache/inlong/pull/5568#discussion_r948660353


##########
inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/tdmq/TDMQMetadataReader.java:
##########
@@ -0,0 +1,557 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   duplication license header



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #5568: [INLONG-5564][Sort] Support read pulsar source without adminUrl

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #5568:
URL: https://github.com/apache/inlong/pull/5568#discussion_r948660670


##########
inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/tdmq/TDMQFetcher.java:
##########
@@ -0,0 +1,772 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */

Review Comment:
   duplication license header



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #5568: [INLONG-5564][Sort] Support read pulsar source without adminUrl

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #5568:
URL: https://github.com/apache/inlong/pull/5568#discussion_r946438686


##########
inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java:
##########
@@ -233,35 +234,61 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
                 valueDeserialization,
                 producedTypeInfo);
         final ClientConfigurationData clientConfigurationData = PulsarClientUtils.newClientConf(serviceUrl, properties);
-        FlinkPulsarSource<RowData> source = new FlinkPulsarSource<>(
-                adminUrl,
-                clientConfigurationData,
-                deserializationSchema,
-                properties
-        );
-
-        if (watermarkStrategy != null) {
-            source.assignTimestampsAndWatermarks(watermarkStrategy);
-        }
-
-        switch (startupOptions.startupMode) {
-            case EARLIEST:
-                source.setStartFromEarliest();
-                break;
-            case LATEST:
-                source.setStartFromLatest();
-                break;
-            case SPECIFIC_OFFSETS:
-                source.setStartFromSpecificOffsets(startupOptions.specificOffsets);
-                break;
-            case EXTERNAL_SUBSCRIPTION:
-                MessageId subscriptionPosition = MessageId.latest;
-                if (CONNECTOR_STARTUP_MODE_VALUE_EARLIEST.equals(startupOptions.externalSubStartOffset)) {
-                    subscriptionPosition = MessageId.earliest;
-                }
-                source.setStartFromSubscription(startupOptions.externalSubscriptionName, subscriptionPosition);
+        if (adminUrl != null) {
+            FlinkPulsarSource<RowData> source = new FlinkPulsarSource<>(
+                    adminUrl,
+                    clientConfigurationData,
+                    deserializationSchema,
+                    properties
+            );
+
+            if (watermarkStrategy != null) {
+                source.assignTimestampsAndWatermarks(watermarkStrategy);
+            }
+
+            switch (startupOptions.startupMode) {
+                case EARLIEST:
+                    source.setStartFromEarliest();
+                    break;
+                case LATEST:
+                    source.setStartFromLatest();
+                    break;
+                case SPECIFIC_OFFSETS:
+                    source.setStartFromSpecificOffsets(startupOptions.specificOffsets);
+                    break;
+                case EXTERNAL_SUBSCRIPTION:
+                    MessageId subscriptionPosition = MessageId.latest;
+                    if (CONNECTOR_STARTUP_MODE_VALUE_EARLIEST.equals(startupOptions.externalSubStartOffset)) {
+                        subscriptionPosition = MessageId.earliest;
+                    }
+                    source.setStartFromSubscription(startupOptions.externalSubscriptionName, subscriptionPosition);
+            }
+            return SourceFunctionProvider.of(source, false);
+        } else {
+            FlinkTDMQSource<RowData> source = new FlinkTDMQSource<>(

Review Comment:
   when admin - url is empty doesn't mean the user want to create tdmqSource 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org