You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2021/12/20 11:53:26 UTC

[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2036: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar.

luchunliang commented on a change in pull request #2036:
URL: https://github.com/apache/incubator-inlong/pull/2036#discussion_r772303656



##########
File path: inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.Context;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.sdk.sort.api.SortClient;
+import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.SortClientFactory;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default Source implementation of InLong.
+ *
+ * <p> SortSdkSource acquired msg from different upstream data store by register {@link SortClient} for each
+ * sort task. The only things SortSdkSource should do is to get one client by the sort task id, or remove one client
+ * when the task is finished or schedule to other source instance. </p>
+ *
+ * <p> The Default Manager of InLong will schedule the partition and topic automatically. </p>
+ *
+ * <p> Because all sources should implement {@link Configurable}, the SortSdkSource should have
+ * default constructor <b>WITHOUT</b> any arguments, and parameters will be configured by
+ * {@link Configurable#configure(Context)}. </p>
+ */
+public final class SortSdkSource extends AbstractSource implements Configurable, Runnable, EventDrivenSource {
+
+    /** Log of {@link SortSdkSource}. */
+    private static final Logger LOG = LoggerFactory.getLogger(SortSdkSource.class);
+
+    /** Default pool of {@link ScheduledExecutorService}. */
+    private static final int CORE_POOL_SIZE = 1;
+
+    /** Default consume strategy of {@link SortClient}. */
+    private static final  SortClientConfig.ConsumeStrategy defaultStrategy = SortClientConfig.ConsumeStrategy.lastest;
+
+    /** Map of {@link SortClient}. */
+    private ConcurrentHashMap<String, SortClient> clients;
+
+    /** The cluster name of sort. */
+    private String sortClusterName;
+
+    /** Reload config interval. */
+    private long reloadInterval;
+
+    /** Context of SortSdkSource. */
+    private SortSdkSourceContext context;
+
+    /** Executor for config reloading. */
+    private ScheduledExecutorService pool;
+
+    /**
+     * Start SortSdkSource.
+     */
+    @Override
+    public synchronized void start() {
+        this.reload();
+    }
+
+    /**
+     * Stop {@link #pool} and close all {@link SortClient}.
+     */
+    @Override
+    public void stop() {
+        pool.shutdownNow();
+        clients.forEach((sortId, client) -> client.close());
+    }
+
+    /**
+     * Entrance of {@link #pool} to reload clients with fix rate {@link #reloadInterval}.
+     */
+    @Override
+    public void run() {
+        this.reload();
+    }
+
+    /**
+     * Configure parameters.
+     *
+     * @param context Context of source.
+     */
+    @Override
+    public void configure(Context context) {
+        this.clients = new ConcurrentHashMap<>();
+        this.sortClusterName = SortClusterConfigHolder.getClusterConfig().getClusterName();
+        Preconditions.checkState(context != null, "No context, configure failed");
+        this.context = new SortSdkSourceContext(getName(), context);
+        this.reloadInterval = this.context.getReloadInterval();
+        this.initReloadExecutor();
+    }
+
+    /**
+     * Init ScheduledExecutorService with fix reload rate {@link #reloadInterval}.
+     */
+    private void initReloadExecutor() {
+        this.pool = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
+        pool.scheduleAtFixedRate(this, reloadInterval, reloadInterval, TimeUnit.SECONDS);

Review comment:
       It is not good for creating a thread pool for a period reload task.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org