You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/07/07 02:51:26 UTC

[GitHub] [inlong] healchow commented on a diff in pull request #4891: [INLONG-4890][Sort] Support TubeMQ connector

healchow commented on code in PR #4891:
URL: https://github.com/apache/inlong/pull/4891#discussion_r915403103


##########
inlong-sort/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java:
##########
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.tubemq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.inlong.sort.tubemq.table.TubeMQOptions;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
+import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
+import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.TimeUtils.parseDuration;
+
+/**
+ * The Flink TubeMQ Consumer.
+ *
+ * @param <T> The type of records produced by this data source
+ */
+public class FlinkTubeMQConsumer<T>
+        extends RichParallelSourceFunction<T> implements CheckpointedFunction {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(FlinkTubeMQConsumer.class);
+
+    private static final String TUBE_OFFSET_STATE = "tube-offset-state";
+
+    private static final String SPLIT_COMMA = ",";
+    private static final String SPLIT_COLON = ":";
+
+    /**
+     * The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
+     */
+    private final String masterAddress;
+
+    /**
+     * The topic name.
+     */
+    private final String topic;
+
+    /**
+     * The tubemq consumers use this tid set to filter records reading from server.
+     */
+    private final TreeSet<String> tidSet;
+
+    /**
+     * The consumer group name.
+     */
+    private final String consumerGroup;
+
+    /**
+     * The deserializer for records.
+     */
+    private final DeserializationSchema<T> deserializationSchema;
+
+    /**
+     * The random key for TubeMQ consumer group when startup.
+     */
+    private final String sessionKey;
+
+    /**
+     * True if consuming message from max offset.
+     */
+    private final boolean consumeFromMax;
+
+    /**
+     * The time to wait if tubemq broker returns message not found.
+     */
+    private final Duration messageNotFoundWaitPeriod;
+
+    /**
+     * The max time to marked source idle.
+     */
+    private final Duration maxIdleTime;
+
+    /**
+     * Flag indicating whether the consumer is still running.
+     **/
+    private volatile boolean running;
+
+    /**
+     * The state for the offsets of queues.
+     */
+    private transient ListState<Tuple2<String, Long>> offsetsState;
+
+    /**
+     * The current offsets of partitions which are stored in {@link #offsetsState}
+     * once a checkpoint is triggered.
+     *
+     * NOTE: The offsets are populated in the main thread and saved in the
+     * checkpoint thread. Its usage must be guarded by the checkpoint lock.</p>
+     */
+    private transient Map<String, Long> currentOffsets;
+
+    /**
+     * The TubeMQ session factory.
+     */
+    private transient TubeSingleSessionFactory messageSessionFactory;
+
+    /**
+     * The TubeMQ pull consumer.
+     */
+    private transient PullMessageConsumer messagePullConsumer;
+
+    /**
+     * Build a TubeMQ source function
+     *
+     * @param masterAddress the master address of TubeMQ
+     * @param topic the topic name
+     * @param tidSet the  topic's filter condition items
+     * @param consumerGroup the consumer group name
+     * @param deserializationSchema the deserialize schema
+     * @param configuration the configure
+     */
+    public FlinkTubeMQConsumer(
+            String masterAddress,
+            String topic,
+            TreeSet<String> tidSet,
+            String consumerGroup,
+            DeserializationSchema<T> deserializationSchema,
+            Configuration configuration,
+            String sessionKey
+    ) {
+        checkNotNull(masterAddress,

Review Comment:
   Those lines can be merged into one line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org