You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/21 07:21:51 UTC
[incubator-inlong] branch master updated: [INLONG-3243][Sort-Standalone] Support multiple scenes to request configs (#3244)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f378682 [INLONG-3243][Sort-Standalone] Support multiple scenes to request configs (#3244)
f378682 is described below
commit f37868249e17c2ac1b3449416a06c4e905b8335c
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Mon Mar 21 15:21:48 2022 +0800
[INLONG-3243][Sort-Standalone] Support multiple scenes to request configs (#3244)
---
.../config/holder/CommonPropertiesHolder.java | 17 +---
.../config/holder/ManagerUrlHandler.java | 91 ++++++++++++++++++++++
.../loader/ClassResourceManagerUrlLoader.java | 76 ++++++++++++++++++
.../loader/ManagerSortClusterConfigLoader.java | 3 +-
.../standalone/config/loader/ManagerUrlLoader.java | 40 ++++++++++
.../standalone/source/sortsdk/SortSdkSource.java | 53 ++++++++++++-
.../source/sortsdk/SortSdkSourceContext.java | 4 +
7 files changed, 266 insertions(+), 18 deletions(-)
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
index 422b1cd..997b893 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -35,10 +35,9 @@ import org.slf4j.Logger;
public class CommonPropertiesHolder {
public static final Logger LOG = InlongLoggerFactory.getLogger(CommonPropertiesHolder.class);
- public static final String KEY_COMMON_PROPERTIES = "common_properties_loader";
public static final String DEFAULT_LOADER = ClassResourceCommonPropertiesLoader.class.getName();
+ public static final String KEY_COMMON_PROPERTIES = "common_properties_loader";
public static final String KEY_CLUSTER_ID = "clusterId";
- public static final String KEY_SOURCE_CONFIG_MANAGER_URL = "sortSourceConfig.managerUrl";
private static Map<String, String> props;
private static Context context;
@@ -89,7 +88,7 @@ public class CommonPropertiesHolder {
/**
* get context
- *
+ *
* @return the context
*/
public static Context getContext() {
@@ -198,20 +197,12 @@ public class CommonPropertiesHolder {
}
/**
- * Get manager URL
- *
- * @return Manager URL
- */
- public static String getSourceConfigManagerUrl() {
- return getString(KEY_SOURCE_CONFIG_MANAGER_URL);
- }
-
- /**
* getAuditFormatInterval
- *
+ *
* @return
*/
public static long getAuditFormatInterval() {
return auditFormatInterval;
}
+
}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/ManagerUrlHandler.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/ManagerUrlHandler.java
new file mode 100644
index 0000000..d14e6da
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/ManagerUrlHandler.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.holder;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.config.loader.ClassResourceManagerUrlLoader;
+import org.apache.inlong.sort.standalone.config.loader.ManagerUrlLoader;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * Manager address get handler.
+ *
+ * <p> Used to acquire the ip and port of manager, which sort sdk and sort-standalone request config from. </p>
+ * <p> The default implementation {@link ClassResourceManagerUrlLoader}
+ * is base on {@link CommonPropertiesHolder} to acquire properties. </p>
+ */
+public class ManagerUrlHandler {
+
+ private static final Logger LOG = InlongLoggerFactory.getLogger(ManagerUrlHandler.class);
+ private static final String KEY_MANAGER_URL_LOADER_TYPE = "managerUrlLoaderType";
+
+ private static ManagerUrlLoader instance;
+
+ /**
+ * Delete no argument constructor.
+ */
+ private ManagerUrlHandler() {
+
+ }
+
+ /**
+ * Get URL where SortSdk request SortSourceConfig.
+ *
+ * @return URL to get SortSourceConfig.
+ */
+ public static String getSortSourceConfigUrl() {
+ return get().acquireSortSourceConfigUrl();
+ }
+
+ /**
+ * Get URL where Sort-Standalone request SortClusterConfig.
+ *
+ * @return URL to get SortClusterConfig.
+ */
+ public static String getSortClusterConfigUrl() {
+ return get().acquireSortClusterConfigUrl();
+ }
+
+ private static ManagerUrlLoader get() {
+ if (instance != null) {
+ return instance;
+ }
+ synchronized (ManagerUrlLoader.class) {
+ String loaderType = CommonPropertiesHolder
+ .getString(KEY_MANAGER_URL_LOADER_TYPE, ClassResourceManagerUrlLoader.class.getName());
+ LOG.info("Start to load ManagerUrlLoader, type is {}.", loaderType);
+ try {
+ Class<?> handlerClass = ClassUtils.getClass(loaderType);
+ Object handlerObj = handlerClass.getDeclaredConstructor().newInstance();
+ if (handlerObj instanceof ManagerUrlLoader) {
+ instance = (ManagerUrlLoader) handlerObj;
+ }
+ } catch (Throwable t) {
+ LOG.error("Got exception when load ManagerAddrGetHandler, type is {}, err is {}",
+ loaderType, t.getMessage());
+ }
+ Optional.ofNullable(instance).ifPresent(inst -> inst.configure(new Context(CommonPropertiesHolder.get())));
+ }
+ return instance;
+ }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceManagerUrlLoader.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceManagerUrlLoader.java
new file mode 100644
index 0000000..49ff79e
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceManagerUrlLoader.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * Default ManagerUrlLoader. acquire URLs from common.properties.
+ */
+public class ClassResourceManagerUrlLoader implements ManagerUrlLoader {
+
+ private static final Logger LOG = InlongLoggerFactory.getLogger(ClassResourceManagerUrlLoader.class);
+ private static final String KEY_SORT_CLUSTER_CONFIG_MANAGER_URL = "sortClusterConfig.managerUrl";
+ private static final String KEY_SORT_SOURCE_CONFIG_MANAGER_URL = "sortSourceConfig.managerUrl";
+
+ private String sortSourceConfigUrl;
+ private String sortClusterConfigUrl;
+ public Context context;
+
+ @Override
+ public String acquireSortSourceConfigUrl() {
+ if (sortSourceConfigUrl != null) {
+ return sortSourceConfigUrl;
+ }
+ sortSourceConfigUrl = context.getString(KEY_SORT_SOURCE_CONFIG_MANAGER_URL);
+ if (StringUtils.isBlank(sortSourceConfigUrl)) {
+ String warnMsg = "Get key" + KEY_SORT_SOURCE_CONFIG_MANAGER_URL
+ + " from CommonPropertiesHolder failed, it's a optional property use to specify "
+ + "the url where SortSdk request SortSourceConfig.";
+ LOG.warn(warnMsg);
+ sortSourceConfigUrl = warnMsg;
+ }
+ return sortSourceConfigUrl;
+ }
+
+ @Override
+ public String acquireSortClusterConfigUrl() {
+ if (sortClusterConfigUrl != null) {
+ return sortClusterConfigUrl;
+ }
+ sortClusterConfigUrl = context.getString(KEY_SORT_CLUSTER_CONFIG_MANAGER_URL);
+ if (StringUtils.isBlank(sortClusterConfigUrl)) {
+ String warnMsg = "Get key" + KEY_SORT_CLUSTER_CONFIG_MANAGER_URL
+ + " from CommonPropertiesHolder failed, it's a optional property use to specify "
+ + "the url where Sort-Standalone request SortSourceConfig.";
+ LOG.warn(warnMsg);
+ sortClusterConfigUrl = warnMsg;
+ }
+ return sortClusterConfigUrl;
+ }
+
+ @Override
+ public void configure(Context context) {
+ Optional.ofNullable(context).ifPresent(c -> this.context = c);
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
index eb9e8c1..77c87c0 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
@@ -31,6 +31,7 @@ import org.apache.http.util.EntityUtils;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler;
import org.slf4j.Logger;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
@@ -85,7 +86,7 @@ public class ManagerSortClusterConfigLoader implements SortClusterConfigLoader {
HttpGet httpGet = null;
try {
String clusterName = this.context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
- String url = this.context.getString(SORT_CLUSTER_CONFIG_MANAGER) + "?apiVersion=1.0&clusterName="
+ String url = ManagerUrlHandler.getSortClusterConfigUrl() + "?apiVersion=1.0&clusterName="
+ clusterName + "&md5=";
if (StringUtils.isNotBlank(this.md5)) {
url += this.md5;
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
new file mode 100644
index 0000000..0f1f059
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.config.loader;
+
+import org.apache.flume.conf.Configurable;
+
+/**
+ * Interface of ManagerUrlLoader.
+ */
+public interface ManagerUrlLoader extends Configurable {
+
+ /**
+ * Acquire SortSourceConfigUrl
+ *
+ * @return SortSourceConfigUrl
+ */
+ String acquireSortSourceConfigUrl();
+
+ /**
+ * Acquire SortClusterConfigUrl
+ *
+ * @return SortClusterConfigUrl
+ */
+ String acquireSortClusterConfigUrl();
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index 1b4eeef..daa6f92 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -30,15 +30,21 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
import org.apache.inlong.sdk.sort.api.SortClient;
import org.apache.inlong.sdk.sort.api.SortClientConfig;
import org.apache.inlong.sdk.sort.api.SortClientFactory;
+import org.apache.inlong.sdk.sort.impl.ManagerReportHandlerImpl;
+import org.apache.inlong.sdk.sort.impl.MetricReporterImpl;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +67,10 @@ public final class SortSdkSource extends AbstractSource implements Configurable,
// Log of {@link SortSdkSource}.
private static final Logger LOG = LoggerFactory.getLogger(SortSdkSource.class);
+ // KEY of QueryConsumeConfig Type
+ private static final String KEY_QUERY_CONSUME_CONFIG_TYPE =
+ "sortSourceConfig.QueryConsumeConfigType";
+
// Default pool of {@link ScheduledExecutorService}.
private static final int CORE_POOL_SIZE = 1;
@@ -120,6 +130,7 @@ public final class SortSdkSource extends AbstractSource implements Configurable,
this.context = new SortSdkSourceContext(getName(), context);
this.reloadInterval = this.context.getReloadInterval();
this.initReloadExecutor();
+
}
/**
@@ -169,7 +180,7 @@ public final class SortSdkSource extends AbstractSource implements Configurable,
/**
* Stop an expiry client from SortTaskConfig.
* <p>
- * If the sortId is not in active clients, but not in configs, stop it.
+ * If the sortId is in active clients, but not in configs, stop it.
* </p>
*
* @param configs Updated SortTaskConfig
@@ -180,9 +191,10 @@ public final class SortSdkSource extends AbstractSource implements Configurable,
.collect(Collectors.toSet());
clients.keySet().stream()
- .filter(updatedSortIds::contains)
+ .filter(sortId -> !updatedSortIds.contains(sortId))
.forEach(sortId -> {
final SortClient client = clients.get(sortId);
+ LOG.info("Close sort client {}.", sortId);
try {
client.close();
} catch (Throwable th) {
@@ -207,7 +219,7 @@ public final class SortSdkSource extends AbstractSource implements Configurable,
* @param config The config to be updated.
*/
private void updateClientConfig(SortClientConfig config) {
- config.setManagerApiUrl(CommonPropertiesHolder.getSourceConfigManagerUrl());
+ config.setManagerApiUrl(ManagerUrlHandler.getSortSourceConfigUrl());
}
/**
@@ -228,7 +240,19 @@ public final class SortSdkSource extends AbstractSource implements Configurable,
final FetchCallback callback = FetchCallback.Factory.create(sortId, getChannelProcessor(), context);
clientConfig.setCallback(callback);
this.updateClientConfig(clientConfig);
- SortClient client = SortClientFactory.createSortClient(clientConfig);
+ SortClient client;
+ QueryConsumeConfig queryConsumeConfigImpl = this.getQueryConfigImpl();
+ if (queryConsumeConfigImpl != null) {
+ // if it specifies the type of QueryConsumeConfig.
+ LOG.info("Create sort sdk client in custom way.");
+ client = SortClientFactory.createSortClient(clientConfig,
+ queryConsumeConfigImpl,
+ new MetricReporterImpl(clientConfig),
+ new ManagerReportHandlerImpl());
+ } else {
+ LOG.info("Create sort sdk client in default way.");
+ client = SortClientFactory.createSortClient(clientConfig);
+ }
client.init();
// temporary use to ACK fetched msg.
callback.setClient(client);
@@ -241,4 +265,25 @@ public final class SortSdkSource extends AbstractSource implements Configurable,
return null;
}
+ private QueryConsumeConfig getQueryConfigImpl() {
+ String className = CommonPropertiesHolder.getString(KEY_QUERY_CONSUME_CONFIG_TYPE);
+ if (StringUtils.isBlank(className)) {
+ LOG.info("There is no property of {}, use default implementation.", KEY_QUERY_CONSUME_CONFIG_TYPE);
+ return null;
+ }
+ LOG.info("Start to load QueryConfig class {}.", className);
+ try {
+ Class<?> queryConfigType = ClassUtils.getClass(className);
+ Object obj = queryConfigType.getDeclaredConstructor().newInstance();
+ if (obj instanceof QueryConsumeConfig) {
+ LOG.info("Load {} successfully.", className);
+ return (QueryConsumeConfig) obj;
+ }
+ } catch (Throwable t) {
+ LOG.error("Got exception when load QueryConfigImpl, class name is " + className + ". Exception is "
+ + t.getMessage(), t);
+ }
+ return null;
+ }
+
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
index 363e3b8..bdf540f 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.standalone.source.sortsdk;
import org.apache.flume.Context;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
@@ -152,6 +153,9 @@ public final class SortSdkSourceContext extends SourceContext {
dimensions.put(SortMetricItem.KEY_SOURCE_DATA_ID, topic);
if (event != null) {
SortMetricItem.fillInlongId(event, dimensions);
+ long msgTime = event.getRawLogTime();
+ long auditFormatTime = msgTime - msgTime % CommonPropertiesHolder.getAuditFormatInterval();
+ dimensions.put(SortMetricItem.KEY_MESSAGE_TIME, String.valueOf(auditFormatTime));
}
return dimensions;
}