You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2021/12/20 08:20:06 UTC

[GitHub] [incubator-inlong] imvan opened a new pull request #2036: Feature/inlong 1892

imvan opened a new pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036


   ### Title Name: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.
   
   Fixes #1892
   
   ### Motivation
   
   Integrate SortSdk into SortSdkSource, and support to consume events from Pulsar cache cluster.
   
   ### Modifications
   
   Same as the motivation.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (JavaDocs)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] doleyzi commented on a change in pull request #2036: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.

Posted by GitBox <gi...@apache.org>.
doleyzi commented on a change in pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036#discussion_r772283112



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/SourceContext.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source;
+
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Base source context <b>WITHOUT</b> metric reporter.
+ * The derived classes of SourceContext may implement {@link org.apache.inlong.commons.config.metrics.MetricItem} and
+ * realize methods to report customized metrics.
+ */
+public class SourceContext {
+
+    /** The reload interval of source.*/
+    private static final String KEY_RELOAD_INTERVAL = "reloadInterval";
+
+    /** The configured source context.*/
+    private final Context sourceContext;
+
+    /** Name of source. Usually the class name of source.*/
+    private final String sourceName;
+
+    /** Cluster Id of source.*/
+    @NotNull
+    private final String clusterId;
+
+    /**
+     * Constructor of {@link SourceContext}.
+     *
+     * @param sourceName Name of source. Usually the class name of source.
+     * @param context The configured source context.
+     */
+    public SourceContext(
+            @NotBlank(message = "sourceName should not be empty or null") final String sourceName,
+            @NotNull(message = "context should not be null") final Context context) {
+
+        this.sourceName = sourceName;
+        this.sourceContext = context;
+        this.clusterId = context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
+    }
+
+    /**
+     * Obtain the reload interval of source.
+     * @return Reload interval of source.
+     */
+    public final long getReloadInterval() {
+        return sourceContext.getLong(SourceContext.KEY_RELOAD_INTERVAL, 60000L);

Review comment:
       magic number  60000L

##########
File path: inlong-sort-standalone/sort-standalone-source/pom.xml
##########
@@ -42,5 +42,39 @@
             <artifactId>sort-standalone-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>jakarta.validation</groupId>
+            <artifactId>jakarta.validation-api</artifactId>
+            <version>2.0.2</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <artifactId>powermock-module-junit4</artifactId>
+            <groupId>org.powermock</groupId>
+            <scope>test</scope>
+            <version>2.0.2</version>

Review comment:
       ditto

##########
File path: inlong-sort-standalone/sort-standalone-source/pom.xml
##########
@@ -42,5 +42,39 @@
             <artifactId>sort-standalone-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>jakarta.validation</groupId>
+            <artifactId>jakarta.validation-api</artifactId>
+            <version>2.0.2</version>

Review comment:
       abstract version to properties




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] wardlican commented on a change in pull request #2036: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.

Posted by GitBox <gi...@apache.org>.
wardlican commented on a change in pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036#discussion_r772314565



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/SourceContext.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source;
+
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Base source context <b>WITHOUT</b> metric reporter.
+ * The derived classes of SourceContext may implement {@link org.apache.inlong.commons.config.metrics.MetricItem} and
+ * realize methods to report customized metrics.
+ */
+public class SourceContext {
+
+    /** The reload interval of source.*/

Review comment:
       it`s better for using "//" to replase "/**/" from single line comments




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] wardlican commented on a change in pull request #2036: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.

Posted by GitBox <gi...@apache.org>.
wardlican commented on a change in pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036#discussion_r772314565



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/SourceContext.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source;
+
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Base source context <b>WITHOUT</b> metric reporter.
+ * The derived classes of SourceContext may implement {@link org.apache.inlong.commons.config.metrics.MetricItem} and
+ * realize methods to report customized metrics.
+ */
+public class SourceContext {
+
+    /** The reload interval of source.*/

Review comment:
       it`s better for use "//" to replase "/**/" from single line comments




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2036: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036#discussion_r772303656



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.Context;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.sdk.sort.api.SortClient;
+import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.SortClientFactory;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default Source implementation of InLong.
+ *
+ * <p> SortSdkSource acquired msg from different upstream data store by register {@link SortClient} for each
+ * sort task. The only things SortSdkSource should do is to get one client by the sort task id, or remove one client
+ * when the task is finished or schedule to other source instance. </p>
+ *
+ * <p> The Default Manager of InLong will schedule the partition and topic automatically. </p>
+ *
+ * <p> Because all sources should implement {@link Configurable}, the SortSdkSource should have
+ * default constructor <b>WITHOUT</b> any arguments, and parameters will be configured by
+ * {@link Configurable#configure(Context)}. </p>
+ */
+public final class SortSdkSource extends AbstractSource implements Configurable, Runnable, EventDrivenSource {
+
+    /** Log of {@link SortSdkSource}. */
+    private static final Logger LOG = LoggerFactory.getLogger(SortSdkSource.class);
+
+    /** Default pool of {@link ScheduledExecutorService}. */
+    private static final int CORE_POOL_SIZE = 1;
+
+    /** Default consume strategy of {@link SortClient}. */
+    private static final  SortClientConfig.ConsumeStrategy defaultStrategy = SortClientConfig.ConsumeStrategy.lastest;
+
+    /** Map of {@link SortClient}. */
+    private ConcurrentHashMap<String, SortClient> clients;
+
+    /** The cluster name of sort. */
+    private String sortClusterName;
+
+    /** Reload config interval. */
+    private long reloadInterval;
+
+    /** Context of SortSdkSource. */
+    private SortSdkSourceContext context;
+
+    /** Executor for config reloading. */
+    private ScheduledExecutorService pool;
+
+    /**
+     * Start SortSdkSource.
+     */
+    @Override
+    public synchronized void start() {
+        this.reload();
+    }
+
+    /**
+     * Stop {@link #pool} and close all {@link SortClient}.
+     */
+    @Override
+    public void stop() {
+        pool.shutdownNow();
+        clients.forEach((sortId, client) -> client.close());
+    }
+
+    /**
+     * Entrance of {@link #pool} to reload clients with fix rate {@link #reloadInterval}.
+     */
+    @Override
+    public void run() {
+        this.reload();
+    }
+
+    /**
+     * Configure parameters.
+     *
+     * @param context Context of source.
+     */
+    @Override
+    public void configure(Context context) {
+        this.clients = new ConcurrentHashMap<>();
+        this.sortClusterName = SortClusterConfigHolder.getClusterConfig().getClusterName();
+        Preconditions.checkState(context != null, "No context, configure failed");
+        this.context = new SortSdkSourceContext(getName(), context);
+        this.reloadInterval = this.context.getReloadInterval();
+        this.initReloadExecutor();
+    }
+
+    /**
+     * Init ScheduledExecutorService with fix reload rate {@link #reloadInterval}.
+     */
+    private void initReloadExecutor() {
+        this.pool = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
+        pool.scheduleAtFixedRate(this, reloadInterval, reloadInterval, TimeUnit.SECONDS);

Review comment:
       It is not good for creating a thread pool for a period reload task.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] dockerzhang merged pull request #2036: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.

Posted by GitBox <gi...@apache.org>.
dockerzhang merged pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] imvan commented on a change in pull request #2036: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.

Posted by GitBox <gi...@apache.org>.
imvan commented on a change in pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036#discussion_r775218469



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.inlong.sdk.sort.api.ReadCallback;
+import org.apache.inlong.sdk.sort.api.SortClient;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Implementation of {@link ReadCallback}.
+ *
+ * TODO: Sort sdk should deliver one object which is held by {@link ProfileEvent} and used to ack upstream data store
+ * The code should be like :
+ *
+ *        public void onFinished(final MessageRecord messageRecord, ACKer acker) {
+ *            doSomething();
+ *            final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders(), acker);
+ *             channelProcessor.processEvent(profileEvent);
+ *        }
+ *
+ * The ACKer will be used to <b>ACK</b> upstream after that the downstream <b>ACKed</b> sort-standalone.
+ * This process seems like <b>transaction</b> of the whole sort-standalone, and which
+ * ensure <b>At Least One</b> semantics.
+ */
+public class FetchCallback implements ReadCallback {
+
+    // Logger of {@link FetchCallback}.
+    private static final Logger LOG = LoggerFactory.getLogger(FetchCallback.class);
+
+    // SortId of fetch message.
+    private final String sortId;
+
+    // ChannelProcessor that put message in specific channel.
+    private final ChannelProcessor channelProcessor;
+
+    // Context of source, used to report fetch results.
+    private final SortSdkSourceContext context;
+
+    // Temporary usage for ACK. The {@link SortClient} and Callback should not circular reference each other.
+    private SortClient client;
+
+    /**
+     * Private constructor of {@link FetchCallback}.
+     * <p> The construction of FetchCallback should be initiated by {@link FetchCallback.Factory}.</p>
+     *
+     * @param sortId SortId of fetch message.
+     * @param channelProcessor ChannelProcessor that message put in.
+     * @param context The context to report fetch results.
+     */
+    private FetchCallback(
+            final String sortId,
+            final ChannelProcessor channelProcessor,
+            final SortSdkSourceContext context) {
+        this.sortId = sortId;
+        this.channelProcessor = channelProcessor;
+        this.context = context;
+    }
+
+    /**
+     * Set client for ack.
+     * @param client client for ack.
+     */
+    public void setClient(@NotNull SortClient client) {
+        this.client = client;
+    }
+
+    /**
+     * The callback function that SortSDK invoke when fetch messages.
+     *
+     * <p> In order to ACK the fetched msg, {@link FetchCallback} has to hold the {@link SortClient} which results in
+     * <b>Cycle Reference</b>. The {@link SortClient} should deliver one object to ACK after invoke the callback method
+     * {@link ReadCallback#onFinished(MessageRecord)}. </p>
+     *
+     * @param messageRecord message
+     */
+    @Override
+    public void onFinished(final MessageRecord messageRecord) {
+        try {
+            Preconditions.checkState(messageRecord != null, "Fetched msg is null.");
+            final SubscribeFetchResult result = SubscribeFetchResult.Factory.create(sortId, messageRecord);
+            final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders());
+            channelProcessor.processEvent(profileEvent);
+            context.reportToMetric(profileEvent, sortId, "-", SortSdkSourceContext.FetchResult.SUCCESS);
+            client.ack(messageRecord.getMsgKey(), messageRecord.getMsgKey());

Review comment:
       As the explanation in javadoc of FetchCallback.java and callback method "onFinished", the ACKing operation is done by SortSdk client, which means that the FetchCallback class must held one instance of corresponding client while the client hold callback object.  In order to avoid cycle reference, the SDK client should pass though one object for acking, and this object will be held by ProfileEvent and used to ack after receiving the ACK from downstream. The current acking operation is just a temporary usage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] imvan commented on a change in pull request #2036: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.

Posted by GitBox <gi...@apache.org>.
imvan commented on a change in pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036#discussion_r774602568



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/SourceContext.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source;
+
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Base source context <b>WITHOUT</b> metric reporter.
+ * The derived classes of SourceContext may implement {@link org.apache.inlong.commons.config.metrics.MetricItem} and
+ * realize methods to report customized metrics.
+ */
+public class SourceContext {
+
+    /** The reload interval of source.*/

Review comment:
       thanks, done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2036: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036#discussion_r775193153



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.inlong.sdk.sort.api.ReadCallback;
+import org.apache.inlong.sdk.sort.api.SortClient;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Implementation of {@link ReadCallback}.
+ *
+ * TODO: Sort sdk should deliver one object which is held by {@link ProfileEvent} and used to ack upstream data store
+ * The code should be like :
+ *
+ *        public void onFinished(final MessageRecord messageRecord, ACKer acker) {
+ *            doSomething();
+ *            final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders(), acker);
+ *             channelProcessor.processEvent(profileEvent);
+ *        }
+ *
+ * The ACKer will be used to <b>ACK</b> upstream after that the downstream <b>ACKed</b> sort-standalone.
+ * This process seems like <b>transaction</b> of the whole sort-standalone, and which
+ * ensure <b>At Least One</b> semantics.
+ */
+public class FetchCallback implements ReadCallback {
+
+    // Logger of {@link FetchCallback}.
+    private static final Logger LOG = LoggerFactory.getLogger(FetchCallback.class);
+
+    // SortId of fetch message.
+    private final String sortId;
+
+    // ChannelProcessor that put message in specific channel.
+    private final ChannelProcessor channelProcessor;
+
+    // Context of source, used to report fetch results.
+    private final SortSdkSourceContext context;
+
+    // Temporary usage for ACK. The {@link SortClient} and Callback should not circular reference each other.
+    private SortClient client;
+
+    /**
+     * Private constructor of {@link FetchCallback}.
+     * <p> The construction of FetchCallback should be initiated by {@link FetchCallback.Factory}.</p>
+     *
+     * @param sortId SortId of fetch message.
+     * @param channelProcessor ChannelProcessor that message put in.
+     * @param context The context to report fetch results.
+     */
+    private FetchCallback(
+            final String sortId,
+            final ChannelProcessor channelProcessor,
+            final SortSdkSourceContext context) {
+        this.sortId = sortId;
+        this.channelProcessor = channelProcessor;
+        this.context = context;
+    }
+
+    /**
+     * Set client for ack.
+     * @param client client for ack.
+     */
+    public void setClient(@NotNull SortClient client) {
+        this.client = client;
+    }
+
+    /**
+     * The callback function that SortSDK invoke when fetch messages.
+     *
+     * <p> In order to ACK the fetched msg, {@link FetchCallback} has to hold the {@link SortClient} which results in
+     * <b>Cycle Reference</b>. The {@link SortClient} should deliver one object to ACK after invoke the callback method
+     * {@link ReadCallback#onFinished(MessageRecord)}. </p>
+     *
+     * @param messageRecord message
+     */
+    @Override
+    public void onFinished(final MessageRecord messageRecord) {
+        try {
+            Preconditions.checkState(messageRecord != null, "Fetched msg is null.");
+            final SubscribeFetchResult result = SubscribeFetchResult.Factory.create(sortId, messageRecord);
+            final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders());
+            channelProcessor.processEvent(profileEvent);
+            context.reportToMetric(profileEvent, sortId, "-", SortSdkSourceContext.FetchResult.SUCCESS);
+            client.ack(messageRecord.getMsgKey(), messageRecord.getMsgKey());

Review comment:
       The acking operation need to move to ProfileEvent. 
   In the future, execute acking operation after sending successfully.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] imvan commented on a change in pull request #2036: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.

Posted by GitBox <gi...@apache.org>.
imvan commented on a change in pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036#discussion_r774599237



##########
File path: inlong-sort-standalone/sort-standalone-source/pom.xml
##########
@@ -42,5 +42,39 @@
             <artifactId>sort-standalone-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>jakarta.validation</groupId>
+            <artifactId>jakarta.validation-api</artifactId>
+            <version>2.0.2</version>

Review comment:
       It seems to be a coincidence that some dependecies have the same version.  In fact, not like dependencies of inlong, they are not associated with each other.  Hence, I don't think it's necessary to abstract version "2.0.2" to properties.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] imvan commented on a change in pull request #2036: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.

Posted by GitBox <gi...@apache.org>.
imvan commented on a change in pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036#discussion_r774603073



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/SourceContext.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source;
+
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Base source context <b>WITHOUT</b> metric reporter.
+ * The derived classes of SourceContext may implement {@link org.apache.inlong.commons.config.metrics.MetricItem} and
+ * realize methods to report customized metrics.
+ */
+public class SourceContext {
+
+    /** The reload interval of source.*/
+    private static final String KEY_RELOAD_INTERVAL = "reloadInterval";
+
+    /** The configured source context.*/
+    private final Context sourceContext;
+
+    /** Name of source. Usually the class name of source.*/
+    private final String sourceName;
+
+    /** Cluster Id of source.*/
+    @NotNull
+    private final String clusterId;
+
+    /**
+     * Constructor of {@link SourceContext}.
+     *
+     * @param sourceName Name of source. Usually the class name of source.
+     * @param context The configured source context.
+     */
+    public SourceContext(
+            @NotBlank(message = "sourceName should not be empty or null") final String sourceName,
+            @NotNull(message = "context should not be null") final Context context) {
+
+        this.sourceName = sourceName;
+        this.sourceContext = context;
+        this.clusterId = context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
+    }
+
+    /**
+     * Obtain the reload interval of source.
+     * @return Reload interval of source.
+     */
+    public final long getReloadInterval() {
+        return sourceContext.getLong(SourceContext.KEY_RELOAD_INTERVAL, 60000L);

Review comment:
       thanks, modified.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] imvan commented on a change in pull request #2036: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.

Posted by GitBox <gi...@apache.org>.
imvan commented on a change in pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036#discussion_r774611638



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.Context;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.sdk.sort.api.SortClient;
+import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.SortClientFactory;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default Source implementation of InLong.
+ *
+ * <p> SortSdkSource acquired msg from different upstream data store by register {@link SortClient} for each
+ * sort task. The only things SortSdkSource should do is to get one client by the sort task id, or remove one client
+ * when the task is finished or schedule to other source instance. </p>
+ *
+ * <p> The Default Manager of InLong will schedule the partition and topic automatically. </p>
+ *
+ * <p> Because all sources should implement {@link Configurable}, the SortSdkSource should have
+ * default constructor <b>WITHOUT</b> any arguments, and parameters will be configured by
+ * {@link Configurable#configure(Context)}. </p>
+ */
+public final class SortSdkSource extends AbstractSource implements Configurable, Runnable, EventDrivenSource {
+
+    /** Log of {@link SortSdkSource}. */
+    private static final Logger LOG = LoggerFactory.getLogger(SortSdkSource.class);
+
+    /** Default pool of {@link ScheduledExecutorService}. */
+    private static final int CORE_POOL_SIZE = 1;
+
+    /** Default consume strategy of {@link SortClient}. */
+    private static final  SortClientConfig.ConsumeStrategy defaultStrategy = SortClientConfig.ConsumeStrategy.lastest;
+
+    /** Map of {@link SortClient}. */
+    private ConcurrentHashMap<String, SortClient> clients;
+
+    /** The cluster name of sort. */
+    private String sortClusterName;
+
+    /** Reload config interval. */
+    private long reloadInterval;
+
+    /** Context of SortSdkSource. */
+    private SortSdkSourceContext context;
+
+    /** Executor for config reloading. */
+    private ScheduledExecutorService pool;
+
+    /**
+     * Start SortSdkSource.
+     */
+    @Override
+    public synchronized void start() {
+        this.reload();
+    }
+
+    /**
+     * Stop {@link #pool} and close all {@link SortClient}.
+     */
+    @Override
+    public void stop() {
+        pool.shutdownNow();
+        clients.forEach((sortId, client) -> client.close());
+    }
+
+    /**
+     * Entrance of {@link #pool} to reload clients with fix rate {@link #reloadInterval}.
+     */
+    @Override
+    public void run() {
+        this.reload();
+    }
+
+    /**
+     * Configure parameters.
+     *
+     * @param context Context of source.
+     */
+    @Override
+    public void configure(Context context) {
+        this.clients = new ConcurrentHashMap<>();
+        this.sortClusterName = SortClusterConfigHolder.getClusterConfig().getClusterName();
+        Preconditions.checkState(context != null, "No context, configure failed");
+        this.context = new SortSdkSourceContext(getName(), context);
+        this.reloadInterval = this.context.getReloadInterval();
+        this.initReloadExecutor();
+    }
+
+    /**
+     * Init ScheduledExecutorService with fix reload rate {@link #reloadInterval}.
+     */
+    private void initReloadExecutor() {
+        this.pool = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
+        pool.scheduleAtFixedRate(this, reloadInterval, reloadInterval, TimeUnit.SECONDS);

Review comment:
       The Alibaba Java Coding Guidelines suggest to replace <java.util.Timer> with <java.util.concurrent.ScheduledExecutorService>.    The reason is that the running of timer thread is based on the system time .Which means thatthe timer task will be suspended if the time of system has be set to the past.
   Details can be seen at https://www.jianshu.com/p/588ab4bd6ed2




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] wardlican commented on a change in pull request #2036: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.

Posted by GitBox <gi...@apache.org>.
wardlican commented on a change in pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036#discussion_r772314678



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/SourceContext.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source;
+
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Base source context <b>WITHOUT</b> metric reporter.
+ * The derived classes of SourceContext may implement {@link org.apache.inlong.commons.config.metrics.MetricItem} and
+ * realize methods to report customized metrics.
+ */
+public class SourceContext {
+
+    /** The reload interval of source.*/
+    private static final String KEY_RELOAD_INTERVAL = "reloadInterval";
+
+    /** The configured source context.*/

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter commented on pull request #2036: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036#issuecomment-1000610200


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2036?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2036](https://codecov.io/gh/apache/incubator-inlong/pull/2036?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e237c29) into [master](https://codecov.io/gh/apache/incubator-inlong/commit/78400294c870c0a42abd43c7111f7ebdb53570d8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7840029) will **decrease** coverage by `0.03%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/2036/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/2036?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2036      +/-   ##
   ============================================
   - Coverage     12.28%   12.24%   -0.04%     
   + Complexity     1159     1155       -4     
   ============================================
     Files           413      413              
     Lines         35225    35215      -10     
     Branches       5542     5542              
   ============================================
   - Hits           4328     4313      -15     
   - Misses        30127    30131       +4     
   - Partials        770      771       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/2036?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../client/producer/qltystats/BrokerStatsItemSet.java](https://codecov.io/gh/apache/incubator-inlong/pull/2036/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL2NsaWVudC9wcm9kdWNlci9xbHR5c3RhdHMvQnJva2VyU3RhdHNJdGVtU2V0LmphdmE=) | `67.44% <0.00%> (-4.66%)` | :arrow_down: |
   | [.../java/org/apache/flume/sink/tubemq/TubemqSink.java](https://codecov.io/gh/apache/incubator-inlong/pull/2036/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29ubmVjdG9ycy90dWJlbXEtY29ubmVjdG9yLWZsdW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9mbHVtZS9zaW5rL3R1YmVtcS9UdWJlbXFTaW5rLmphdmE=) | `51.42% <0.00%> (-4.00%)` | :arrow_down: |
   | [.../producer/qltystats/DefaultBrokerRcvQltyStats.java](https://codecov.io/gh/apache/incubator-inlong/pull/2036/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL2NsaWVudC9wcm9kdWNlci9xbHR5c3RhdHMvRGVmYXVsdEJyb2tlclJjdlFsdHlTdGF0cy5qYXZh) | `44.14% <0.00%> (-1.18%)` | :arrow_down: |
   | [.../inlong/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/2036/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaW5sb25nL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/2036/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaW5sb25nL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `34.07% <0.00%> (-0.45%)` | :arrow_down: |
   | [.../flink/connectors/tubemq/TubemqSourceFunction.java](https://codecov.io/gh/apache/incubator-inlong/pull/2036/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29ubmVjdG9ycy90dWJlbXEtY29ubmVjdG9yLWZsaW5rL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9mbGluay9jb25uZWN0b3JzL3R1YmVtcS9UdWJlbXFTb3VyY2VGdW5jdGlvbi5qYXZh) | `0.00% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2036?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2036?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [7840029...e237c29](https://codecov.io/gh/apache/incubator-inlong/pull/2036?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org