You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/21 07:21:51 UTC

[incubator-inlong] branch master updated: [INLONG-3243][Sort-Standalone] Support multiple scenes to request configs (#3244)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f378682  [INLONG-3243][Sort-Standalone] Support multiple scenes to request configs (#3244)
f378682 is described below

commit f37868249e17c2ac1b3449416a06c4e905b8335c
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Mon Mar 21 15:21:48 2022 +0800

    [INLONG-3243][Sort-Standalone] Support multiple scenes to request configs (#3244)
---
 .../config/holder/CommonPropertiesHolder.java      | 17 +---
 .../config/holder/ManagerUrlHandler.java           | 91 ++++++++++++++++++++++
 .../loader/ClassResourceManagerUrlLoader.java      | 76 ++++++++++++++++++
 .../loader/ManagerSortClusterConfigLoader.java     |  3 +-
 .../standalone/config/loader/ManagerUrlLoader.java | 40 ++++++++++
 .../standalone/source/sortsdk/SortSdkSource.java   | 53 ++++++++++++-
 .../source/sortsdk/SortSdkSourceContext.java       |  4 +
 7 files changed, 266 insertions(+), 18 deletions(-)

diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
index 422b1cd..997b893 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -35,10 +35,9 @@ import org.slf4j.Logger;
 public class CommonPropertiesHolder {
 
     public static final Logger LOG = InlongLoggerFactory.getLogger(CommonPropertiesHolder.class);
-    public static final String KEY_COMMON_PROPERTIES = "common_properties_loader";
     public static final String DEFAULT_LOADER = ClassResourceCommonPropertiesLoader.class.getName();
+    public static final String KEY_COMMON_PROPERTIES = "common_properties_loader";
     public static final String KEY_CLUSTER_ID = "clusterId";
-    public static final String KEY_SOURCE_CONFIG_MANAGER_URL = "sortSourceConfig.managerUrl";
 
     private static Map<String, String> props;
     private static Context context;
@@ -89,7 +88,7 @@ public class CommonPropertiesHolder {
 
     /**
      * get context
-     * 
+     *
      * @return the context
      */
     public static Context getContext() {
@@ -198,20 +197,12 @@ public class CommonPropertiesHolder {
     }
 
     /**
-     * Get manager URL
-     *
-     * @return Manager URL
-     */
-    public static String getSourceConfigManagerUrl() {
-        return getString(KEY_SOURCE_CONFIG_MANAGER_URL);
-    }
-
-    /**
      * getAuditFormatInterval
-     * 
+     *
      * @return
      */
     public static long getAuditFormatInterval() {
         return auditFormatInterval;
     }
+
 }
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/ManagerUrlHandler.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/ManagerUrlHandler.java
new file mode 100644
index 0000000..d14e6da
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/ManagerUrlHandler.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.holder;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.config.loader.ClassResourceManagerUrlLoader;
+import org.apache.inlong.sort.standalone.config.loader.ManagerUrlLoader;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * Manager address get handler.
+ *
+ * <p> Used to acquire the ip and port of manager, which sort sdk and sort-standalone request config from. </p>
+ * <p> The default implementation {@link ClassResourceManagerUrlLoader}
+ * is base on {@link CommonPropertiesHolder} to acquire properties. </p>
+ */
+public class ManagerUrlHandler {
+
+    private static final Logger LOG = InlongLoggerFactory.getLogger(ManagerUrlHandler.class);
+    private static final String KEY_MANAGER_URL_LOADER_TYPE = "managerUrlLoaderType";
+
+    private static ManagerUrlLoader instance;
+
+    /**
+     * Delete no argument constructor.
+     */
+    private ManagerUrlHandler() {
+
+    }
+
+    /**
+     * Get URL where SortSdk request SortSourceConfig.
+     *
+     * @return URL to get SortSourceConfig.
+     */
+    public static String getSortSourceConfigUrl() {
+        return get().acquireSortSourceConfigUrl();
+    }
+
+    /**
+     * Get URL where Sort-Standalone request SortClusterConfig.
+     *
+     * @return URL to get SortClusterConfig.
+     */
+    public static String getSortClusterConfigUrl() {
+        return get().acquireSortClusterConfigUrl();
+    }
+
+    private static ManagerUrlLoader get() {
+        if (instance != null) {
+            return instance;
+        }
+        synchronized (ManagerUrlLoader.class) {
+            String loaderType = CommonPropertiesHolder
+                    .getString(KEY_MANAGER_URL_LOADER_TYPE, ClassResourceManagerUrlLoader.class.getName());
+            LOG.info("Start to load ManagerUrlLoader, type is {}.", loaderType);
+            try {
+                Class<?> handlerClass = ClassUtils.getClass(loaderType);
+                Object handlerObj = handlerClass.getDeclaredConstructor().newInstance();
+                if (handlerObj instanceof ManagerUrlLoader) {
+                    instance = (ManagerUrlLoader) handlerObj;
+                }
+            } catch (Throwable t) {
+                LOG.error("Got exception when load ManagerAddrGetHandler, type is {}, err is {}",
+                        loaderType, t.getMessage());
+            }
+            Optional.ofNullable(instance).ifPresent(inst -> inst.configure(new Context(CommonPropertiesHolder.get())));
+        }
+        return instance;
+    }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceManagerUrlLoader.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceManagerUrlLoader.java
new file mode 100644
index 0000000..49ff79e
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceManagerUrlLoader.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * Default ManagerUrlLoader. acquire URLs from common.properties.
+ */
+public class ClassResourceManagerUrlLoader implements ManagerUrlLoader {
+
+    private static final Logger LOG = InlongLoggerFactory.getLogger(ClassResourceManagerUrlLoader.class);
+    private static final String KEY_SORT_CLUSTER_CONFIG_MANAGER_URL = "sortClusterConfig.managerUrl";
+    private static final String KEY_SORT_SOURCE_CONFIG_MANAGER_URL = "sortSourceConfig.managerUrl";
+
+    private String sortSourceConfigUrl;
+    private String sortClusterConfigUrl;
+    public Context context;
+
+    @Override
+    public String acquireSortSourceConfigUrl() {
+        if (sortSourceConfigUrl != null) {
+            return sortSourceConfigUrl;
+        }
+        sortSourceConfigUrl = context.getString(KEY_SORT_SOURCE_CONFIG_MANAGER_URL);
+        if (StringUtils.isBlank(sortSourceConfigUrl)) {
+            String warnMsg = "Get key" + KEY_SORT_SOURCE_CONFIG_MANAGER_URL
+                    + " from CommonPropertiesHolder failed, it's a optional property use to specify "
+                    + "the url where SortSdk request SortSourceConfig.";
+            LOG.warn(warnMsg);
+            sortSourceConfigUrl = warnMsg;
+        }
+        return sortSourceConfigUrl;
+    }
+
+    @Override
+    public String acquireSortClusterConfigUrl() {
+        if (sortClusterConfigUrl != null) {
+            return sortClusterConfigUrl;
+        }
+        sortClusterConfigUrl = context.getString(KEY_SORT_CLUSTER_CONFIG_MANAGER_URL);
+        if (StringUtils.isBlank(sortClusterConfigUrl)) {
+            String warnMsg = "Get key" + KEY_SORT_CLUSTER_CONFIG_MANAGER_URL
+                    + " from CommonPropertiesHolder failed, it's a optional property use to specify "
+                    + "the url where Sort-Standalone request SortSourceConfig.";
+            LOG.warn(warnMsg);
+            sortClusterConfigUrl = warnMsg;
+        }
+        return sortClusterConfigUrl;
+    }
+
+    @Override
+    public void configure(Context context) {
+        Optional.ofNullable(context).ifPresent(c -> this.context = c);
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
index eb9e8c1..77c87c0 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
@@ -31,6 +31,7 @@ import org.apache.http.util.EntityUtils;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler;
 import org.slf4j.Logger;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
@@ -85,7 +86,7 @@ public class ManagerSortClusterConfigLoader implements SortClusterConfigLoader {
         HttpGet httpGet = null;
         try {
             String clusterName = this.context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
-            String url = this.context.getString(SORT_CLUSTER_CONFIG_MANAGER) + "?apiVersion=1.0&clusterName="
+            String url = ManagerUrlHandler.getSortClusterConfigUrl() + "?apiVersion=1.0&clusterName="
                     + clusterName + "&md5=";
             if (StringUtils.isNotBlank(this.md5)) {
                 url += this.md5;
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
new file mode 100644
index 0000000..0f1f059
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import org.apache.flume.conf.Configurable;
+
+/**
+ * Interface of ManagerUrlLoader.
+ */
+public interface ManagerUrlLoader extends Configurable {
+
+    /**
+     * Acquire SortSourceConfigUrl
+     *
+     * @return SortSourceConfigUrl
+     */
+    String acquireSortSourceConfigUrl();
+
+    /**
+     * Acquire SortClusterConfigUrl
+     *
+     * @return SortClusterConfigUrl
+     */
+    String acquireSortClusterConfigUrl();
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index 1b4eeef..daa6f92 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -30,15 +30,21 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
 import org.apache.inlong.sdk.sort.api.SortClient;
 import org.apache.inlong.sdk.sort.api.SortClientConfig;
 import org.apache.inlong.sdk.sort.api.SortClientFactory;
+import org.apache.inlong.sdk.sort.impl.ManagerReportHandlerImpl;
+import org.apache.inlong.sdk.sort.impl.MetricReporterImpl;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +67,10 @@ public final class SortSdkSource extends AbstractSource implements Configurable,
     // Log of {@link SortSdkSource}.
     private static final Logger LOG = LoggerFactory.getLogger(SortSdkSource.class);
 
+    // KEY of QueryConsumeConfig Type
+    private static final String KEY_QUERY_CONSUME_CONFIG_TYPE =
+            "sortSourceConfig.QueryConsumeConfigType";
+
     // Default pool of {@link ScheduledExecutorService}.
     private static final int CORE_POOL_SIZE = 1;
 
@@ -120,6 +130,7 @@ public final class SortSdkSource extends AbstractSource implements Configurable,
         this.context = new SortSdkSourceContext(getName(), context);
         this.reloadInterval = this.context.getReloadInterval();
         this.initReloadExecutor();
+
     }
 
     /**
@@ -169,7 +180,7 @@ public final class SortSdkSource extends AbstractSource implements Configurable,
     /**
      * Stop an expiry client from SortTaskConfig.
      * <p>
-     *     If the sortId is not in active clients, but not in configs, stop it.
+     *     If the sortId is in active clients, but not in configs, stop it.
      * </p>
      *
      * @param configs Updated SortTaskConfig
@@ -180,9 +191,10 @@ public final class SortSdkSource extends AbstractSource implements Configurable,
                 .collect(Collectors.toSet());
 
         clients.keySet().stream()
-                .filter(updatedSortIds::contains)
+                .filter(sortId -> !updatedSortIds.contains(sortId))
                 .forEach(sortId -> {
                     final SortClient client = clients.get(sortId);
+                    LOG.info("Close sort client {}.", sortId);
                     try {
                         client.close();
                     } catch (Throwable th) {
@@ -207,7 +219,7 @@ public final class SortSdkSource extends AbstractSource implements Configurable,
      * @param config The config to be updated.
      */
     private void updateClientConfig(SortClientConfig config) {
-        config.setManagerApiUrl(CommonPropertiesHolder.getSourceConfigManagerUrl());
+        config.setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
     }
 
     /**
@@ -228,7 +240,19 @@ public final class SortSdkSource extends AbstractSource implements Configurable,
             final FetchCallback callback = FetchCallback.Factory.create(sortId, getChannelProcessor(), context);
             clientConfig.setCallback(callback);
             this.updateClientConfig(clientConfig);
-            SortClient client = SortClientFactory.createSortClient(clientConfig);
+            SortClient client;
+            QueryConsumeConfig queryConsumeConfigImpl = this.getQueryConfigImpl();
+            if (queryConsumeConfigImpl != null) {
+                // if it specifies the type of QueryConsumeConfig.
+                LOG.info("Create sort sdk client in custom way.");
+                client = SortClientFactory.createSortClient(clientConfig,
+                                queryConsumeConfigImpl,
+                                new MetricReporterImpl(clientConfig),
+                                new ManagerReportHandlerImpl());
+            } else {
+                LOG.info("Create sort sdk client in default way.");
+                client = SortClientFactory.createSortClient(clientConfig);
+            }
             client.init();
             // temporary use to ACK fetched msg.
             callback.setClient(client);
@@ -241,4 +265,25 @@ public final class SortSdkSource extends AbstractSource implements Configurable,
         return null;
     }
 
+    private QueryConsumeConfig getQueryConfigImpl() {
+        String className = CommonPropertiesHolder.getString(KEY_QUERY_CONSUME_CONFIG_TYPE);
+        if (StringUtils.isBlank(className)) {
+            LOG.info("There is no property of {}, use default implementation.", KEY_QUERY_CONSUME_CONFIG_TYPE);
+            return null;
+        }
+        LOG.info("Start to load QueryConfig class {}.", className);
+        try {
+            Class<?> queryConfigType = ClassUtils.getClass(className);
+            Object obj = queryConfigType.getDeclaredConstructor().newInstance();
+            if (obj instanceof QueryConsumeConfig) {
+                LOG.info("Load {} successfully.", className);
+                return (QueryConsumeConfig) obj;
+            }
+        } catch (Throwable t) {
+            LOG.error("Got exception when load QueryConfigImpl, class name is " + className + ". Exception is "
+                    + t.getMessage(), t);
+        }
+        return null;
+    }
+
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
index 363e3b8..bdf540f 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.standalone.source.sortsdk;
 import org.apache.flume.Context;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
@@ -152,6 +153,9 @@ public final class SortSdkSourceContext extends SourceContext {
         dimensions.put(SortMetricItem.KEY_SOURCE_DATA_ID, topic);
         if (event != null) {
             SortMetricItem.fillInlongId(event, dimensions);
+            long msgTime = event.getRawLogTime();
+            long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+            dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
         }
         return dimensions;
     }