You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/12/27 06:16:28 UTC
[incubator-inlong] branch master updated: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar. (#2036)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1517148 [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar. (#2036)
1517148 is described below
commit 15171482ac5c701d097e93fbe2045a4b65653908
Author: imvan <45...@users.noreply.github.com>
AuthorDate: Mon Dec 27 14:16:21 2021 +0800
[INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar. (#2036)
---
.../inlong/sort/standalone/utils/Constants.java | 3 +-
.../sort-standalone-source/pom.xml | 34 ++++
.../sink/pulsar/PulsarProducerCluster.java | 2 +-
.../sort/standalone/source/SourceContext.java | 88 ++++++++++
.../source/sortsdk/DefaultTopicChangeListener.java | 50 ++++++
.../standalone/source/sortsdk/FetchCallback.java | 138 +++++++++++++++
.../standalone/source/sortsdk/SortSdkSource.java | 193 +++++++++++++++++++++
.../source/sortsdk/SortSdkSourceContext.java | 155 +++++++++++++++++
.../source/sortsdk/SubscribeFetchResult.java | 105 +++++++++++
.../source/sortsdk/SortSdkSourceTest.java | 85 +++++++++
10 files changed, 851 insertions(+), 2 deletions(-)
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/Constants.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/Constants.java
index f67cb8a..05b5d42 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/Constants.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/Constants.java
@@ -29,7 +29,8 @@ public interface Constants {
String HEADER_KEY_MSG_TIME = "msgTime";
String HEADER_KEY_SOURCE_IP = "sourceIp";
String HEADER_KEY_SOURCE_TIME = "sourceTime";
- String MESSAGE_KEY = "messageKey";
+ String HEADER_KEY_MESSAGE_KEY = "messageKey";
+ String HEADER_KEY_MSG_OFFSET = "msgOffset";
String RELOAD_INTERVAL = "reloadInterval";
}
diff --git a/inlong-sort-standalone/sort-standalone-source/pom.xml b/inlong-sort-standalone/sort-standalone-source/pom.xml
index f128f97..73a25eb 100644
--- a/inlong-sort-standalone/sort-standalone-source/pom.xml
+++ b/inlong-sort-standalone/sort-standalone-source/pom.xml
@@ -42,5 +42,39 @@
<artifactId>sort-standalone-common</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>jakarta.validation</groupId>
+ <artifactId>jakarta.validation-api</artifactId>
+ <version>2.0.2</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <artifactId>powermock-module-junit4</artifactId>
+ <groupId>org.powermock</groupId>
+ <scope>test</scope>
+ <version>2.0.2</version>
+ </dependency>
+
+ <dependency>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <groupId>org.powermock</groupId>
+ <scope>test</scope>
+ <version>2.0.2</version>
+ </dependency>
+
+ <dependency>
+ <artifactId>mockito-core</artifactId>
+ <groupId>org.mockito</groupId>
+ <scope>test</scope>
+ <version>2.23.0</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
index 355abf1..993e97f 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
@@ -226,7 +226,7 @@ public class PulsarProducerCluster implements LifecycleAware {
this.addMetric(event, topic, false, 0);
return false;
}
- String messageKey = headers.get(Constants.MESSAGE_KEY);
+ String messageKey = headers.get(Constants.HEADER_KEY_MESSAGE_KEY);
if (messageKey == null) {
messageKey = headers.get(Constants.HEADER_KEY_SOURCE_IP);
}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/SourceContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/SourceContext.java
new file mode 100644
index 0000000..af93c8d
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/SourceContext.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source;
+
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Base source context <b>WITHOUT</b> metric reporter.
+ * The derived classes of SourceContext may implement {@link org.apache.inlong.commons.config.metrics.MetricItem} and
+ * realize methods to report customized metrics.
+ */
+public class SourceContext {
+
+ // The key of reload interval of source.
+ private static final String KEY_RELOAD_INTERVAL = "reloadInterval";
+
+ // the default reload interval value in ms.
+ private static final long DEFAULT_RELOAD_INTERVAL_MS = 6000L;
+
+ // The configured source context.
+ private final Context sourceContext;
+
+ // Name of source. Usually the class name of source.
+ private final String sourceName;
+
+ // Cluster Id of source.
+ @NotNull
+ private final String clusterId;
+
+ /**
+ * Constructor of {@link SourceContext}.
+ *
+ * @param sourceName Name of source. Usually the class name of source.
+ * @param context The configured source context.
+ */
+ public SourceContext(
+ @NotBlank(message = "sourceName should not be empty or null") final String sourceName,
+ @NotNull(message = "context should not be null") final Context context) {
+
+ this.sourceName = sourceName;
+ this.sourceContext = context;
+ this.clusterId = context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
+ }
+
+ /**
+ * Obtain the reload interval of source.
+ * @return Reload interval of source.
+ */
+ public final long getReloadInterval() {
+ return sourceContext.getLong(SourceContext.KEY_RELOAD_INTERVAL, DEFAULT_RELOAD_INTERVAL_MS);
+ }
+
+ /**
+ * Obtain the cluster Id of source.
+ * @return Cluster Id of source.
+ */
+ public final String getClusterId() {
+ return clusterId;
+ }
+
+ /**
+ * Obtain the name of source.
+ * @return Name of source.
+ */
+ public final String getSourceName() {
+ return sourceName;
+ }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/DefaultTopicChangeListener.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/DefaultTopicChangeListener.java
new file mode 100644
index 0000000..f74b94e
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/DefaultTopicChangeListener.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import org.apache.inlong.sdk.sort.api.InLongTopicChangeListener;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+
+import java.util.Set;
+
+/**
+ * Default implementation of {@link InLongTopicChangeListener}.
+ *
+ * Do nothing when the topic assignments change.
+ *
+ */
+public class DefaultTopicChangeListener implements InLongTopicChangeListener {
+
+ @Override
+ public boolean onAssignmentsChange(
+ Set<InLongTopic> allInLongTopics,
+ Set<InLongTopic> newInLongTopics,
+ Set<InLongTopic> removedInLongTopics) {
+ return true;
+ }
+
+ @Override
+ public boolean requestAck(String sortTaskId) {
+ return true;
+ }
+
+ @Override
+ public String offlineAndGetAckOffset(String msgKey) {
+ return "";
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
new file mode 100644
index 0000000..87ac897
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.inlong.sdk.sort.api.ReadCallback;
+import org.apache.inlong.sdk.sort.api.SortClient;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Implementation of {@link ReadCallback}.
+ *
+ * TODO: Sort sdk should deliver one object which is held by {@link ProfileEvent} and used to ack upstream data store
+ * The code should be like :
+ *
+ * public void onFinished(final MessageRecord messageRecord, ACKer acker) {
+ * doSomething();
+ * final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders(), acker);
+ * channelProcessor.processEvent(profileEvent);
+ * }
+ *
+ * The ACKer will be used to <b>ACK</b> upstream after that the downstream <b>ACKed</b> sort-standalone.
+ * This process seems like <b>transaction</b> of the whole sort-standalone, and which
+ * ensure <b>At Least One</b> semantics.
+ */
+public class FetchCallback implements ReadCallback {
+
+ // Logger of {@link FetchCallback}.
+ private static final Logger LOG = LoggerFactory.getLogger(FetchCallback.class);
+
+ // SortId of fetch message.
+ private final String sortId;
+
+ // ChannelProcessor that put message in specific channel.
+ private final ChannelProcessor channelProcessor;
+
+ // Context of source, used to report fetch results.
+ private final SortSdkSourceContext context;
+
+ // Temporary usage for ACK. The {@link SortClient} and Callback should not circular reference each other.
+ private SortClient client;
+
+ /**
+ * Private constructor of {@link FetchCallback}.
+ * <p> The construction of FetchCallback should be initiated by {@link FetchCallback.Factory}.</p>
+ *
+ * @param sortId SortId of fetch message.
+ * @param channelProcessor ChannelProcessor that message put in.
+ * @param context The context to report fetch results.
+ */
+ private FetchCallback(
+ final String sortId,
+ final ChannelProcessor channelProcessor,
+ final SortSdkSourceContext context) {
+ this.sortId = sortId;
+ this.channelProcessor = channelProcessor;
+ this.context = context;
+ }
+
+ /**
+ * Set client for ack.
+ * @param client client for ack.
+ */
+ public void setClient(@NotNull SortClient client) {
+ this.client = client;
+ }
+
+ /**
+ * The callback function that SortSDK invoke when fetch messages.
+ *
+ * <p> In order to ACK the fetched msg, {@link FetchCallback} has to hold the {@link SortClient} which results in
+ * <b>Cycle Reference</b>. The {@link SortClient} should deliver one object to ACK after invoke the callback method
+ * {@link ReadCallback#onFinished(MessageRecord)}. </p>
+ *
+ * @param messageRecord message
+ */
+ @Override
+ public void onFinished(final MessageRecord messageRecord) {
+ try {
+ Preconditions.checkState(messageRecord != null, "Fetched msg is null.");
+ final SubscribeFetchResult result = SubscribeFetchResult.Factory.create(sortId, messageRecord);
+ final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders());
+ channelProcessor.processEvent(profileEvent);
+ context.reportToMetric(profileEvent, sortId, "-", SortSdkSourceContext.FetchResult.SUCCESS);
+ client.ack(messageRecord.getMsgKey(), messageRecord.getMsgKey());
+ } catch (NullPointerException npe) {
+ LOG.error("Got a null pointer exception for sortId " + sortId, npe);
+ context.reportToMetric(null, sortId, "-", SortSdkSourceContext.FetchResult.FAILURE);
+ } catch (Exception e) {
+ LOG.error("Ack failed for sortId " + sortId, e);
+ }
+ }
+
+ /**
+ * Factory of {@link FetchCallback}
+ */
+ public static class Factory {
+
+ /**
+ * Create one {@link FetchCallback}.
+ * <p> Validate sortId and channelProcessor before the construction of FetchCallback.</p>
+ *
+ * @param sortId The sortId of fetched message.
+ * @param channelProcessor The channelProcessor that put message in specific channel.
+ * @param context The context to report fetch results.
+ *
+ * @return One FetchCallback.
+ */
+ public static FetchCallback create(
+ @NotBlank(message = "sortId should not be null or empty.") final String sortId,
+ @NotNull(message = "channelProcessor should not be null.") final ChannelProcessor channelProcessor,
+ @NotNull(message = "context should not be null") final SortSdkSourceContext context) {
+ return new FetchCallback(sortId, channelProcessor, context);
+ }
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
new file mode 100644
index 0000000..abad948
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.Context;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.sdk.sort.api.SortClient;
+import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.SortClientFactory;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default Source implementation of InLong.
+ *
+ * <p> SortSdkSource acquired msg from different upstream data store by register {@link SortClient} for each
+ * sort task. The only things SortSdkSource should do is to get one client by the sort task id, or remove one client
+ * when the task is finished or schedule to other source instance. </p>
+ *
+ * <p> The Default Manager of InLong will schedule the partition and topic automatically. </p>
+ *
+ * <p> Because all sources should implement {@link Configurable}, the SortSdkSource should have
+ * default constructor <b>WITHOUT</b> any arguments, and parameters will be configured by
+ * {@link Configurable#configure(Context)}. </p>
+ */
+public final class SortSdkSource extends AbstractSource implements Configurable, Runnable, EventDrivenSource {
+
+ // Log of {@link SortSdkSource}.
+ private static final Logger LOG = LoggerFactory.getLogger(SortSdkSource.class);
+
+ // Default pool of {@link ScheduledExecutorService}.
+ private static final int CORE_POOL_SIZE = 1;
+
+ // Default consume strategy of {@link SortClient}.
+ private static final SortClientConfig.ConsumeStrategy defaultStrategy = SortClientConfig.ConsumeStrategy.lastest;
+
+ // Map of {@link SortClient}.
+ private Map<String, SortClient> clients;
+
+ // The cluster name of sort.
+ private String sortClusterName;
+
+ // Reload config interval.
+ private long reloadInterval;
+
+ // Context of SortSdkSource.
+ private SortSdkSourceContext context;
+
+ // Executor for config reloading.
+ private ScheduledExecutorService pool;
+
+ /**
+ * Start SortSdkSource.
+ */
+ @Override
+ public synchronized void start() {
+ this.reload();
+ }
+
+ /**
+ * Stop {@link #pool} and close all {@link SortClient}.
+ */
+ @Override
+ public void stop() {
+ pool.shutdownNow();
+ clients.forEach((sortId, client) -> client.close());
+ }
+
+ /**
+ * Entrance of {@link #pool} to reload clients with fix rate {@link #reloadInterval}.
+ */
+ @Override
+ public void run() {
+ this.reload();
+ }
+
+ /**
+ * Configure parameters.
+ *
+ * @param context Context of source.
+ */
+ @Override
+ public void configure(Context context) {
+ this.clients = new ConcurrentHashMap<>();
+ this.sortClusterName = SortClusterConfigHolder.getClusterConfig().getClusterName();
+ Preconditions.checkState(context != null, "No context, configure failed");
+ this.context = new SortSdkSourceContext(getName(), context);
+ this.reloadInterval = this.context.getReloadInterval();
+ this.initReloadExecutor();
+ }
+
+ /**
+ * Init ScheduledExecutorService with fix reload rate {@link #reloadInterval}.
+ */
+ private void initReloadExecutor() {
+ this.pool = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
+ pool.scheduleAtFixedRate(this, reloadInterval, reloadInterval, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Reload clients by current {@link SortTaskConfig}.
+ *
+ * <p> Create new clients with new sort task id, and remove the finished or scheduled ones. </p>
+ *
+ * <p> Current version of SortSdk <b>DO NOT</b> support to get the corresponding sort id of {@link SortClient}.
+ * Hence, the maintenance of mapping of <SortId, SortClient> should be done by Source itself. Which is not elegant,
+ * the <b>REMOVE</b> of expire clients will <b>NOT</b> be supported right now. </p>
+ */
+ private void reload() {
+
+ final List<SortTaskConfig> configs = SortClusterConfigHolder.getClusterConfig().getSortTasks();
+ LOG.info("start to reload SortSdkSource");
+
+ // Start new clients
+ for (SortTaskConfig taskConfig : configs) {
+
+ // If exits, skip.
+ final String sortId = taskConfig.getName();
+ SortClient client = this.clients.get(sortId);
+ if (client != null) {
+ continue;
+ }
+
+ // Otherwise, new one client.
+ client = this.newClient(sortId);
+ if (client != null) {
+ this.clients.put(sortId, client);
+ }
+ }
+ }
+
+ /**
+ * Create one {@link SortClient} with specific sort id.
+ *
+ * <p> In current version, the {@link FetchCallback} will hold the client to ACK.
+ * For more details see {@link FetchCallback#onFinished(MessageRecord)}</p>
+ *
+ * @param sortId Sort in of new client.
+ *
+ * @return New sort client.
+ */
+ private SortClient newClient(final String sortId) {
+ LOG.info("Start to new sort client for id: {}", sortId);
+ try {
+ final SortClientConfig clientConfig =
+ new SortClientConfig(sortId, this.sortClusterName, new DefaultTopicChangeListener(),
+ SortSdkSource.defaultStrategy, InetAddress.getLocalHost().getHostAddress());
+ final FetchCallback callback = FetchCallback.Factory.create(sortId, getChannelProcessor(), context);
+ clientConfig.setCallback(callback);
+ SortClient client = SortClientFactory.createSortClient(clientConfig);
+ client.init();
+ // temporary use to ACK fetched msg.
+ callback.setClient(client);
+ return client;
+ } catch (UnknownHostException ex) {
+ LOG.error("Got one UnknownHostException when init client of id: " + sortId, ex);
+ } catch (Throwable th) {
+ LOG.error("Got one throwable when init client of id: " + sortId, th);
+ }
+ return null;
+ }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
new file mode 100644
index 0000000..1ec37bd
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import org.apache.flume.Context;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
+import org.apache.inlong.sort.standalone.source.SourceContext;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Derived class of {@link SourceContext} which implements methods to report metrics.
+ */
+public final class SortSdkSourceContext extends SourceContext {
+
+ // Metric item set of Sort to create and maintain specific metric group.
+ private final SortMetricItemSet metricItemSet;
+
+ /**
+ * Type of metrics to report.
+ *
+ * <p> For {@link SortSdkSource}, there are only two types of fetch results, success or failure.</p>
+ */
+ public enum FetchResult {
+ SUCCESS,
+ FAILURE
+ }
+
+ /**
+ * Constructor of {@link SourceContext}.
+ *
+ * @param sourceName Name of source. Usually the class name of source.
+ * @param context The configured source context.
+ */
+ public SortSdkSourceContext(
+ @NotBlank(message = "sourceName should not be empty or null") final String sourceName,
+ @NotNull(message = "context should not be null") final Context context) {
+
+ super(sourceName, context);
+ this.metricItemSet = new SortMetricItemSet(sourceName);
+ MetricRegister.register(metricItemSet);
+ }
+
+ /**
+ * Entrance to report fetch metrics.
+ *
+ * @param event The fetched event. May be <b>null</b> when fetch failed occurs.
+ * @param sortId Sort id of event.
+ * @param topic Topic that event fetched from. May be <b>null</b> when fetch failed occurs.
+ * @param fetchResult Result of fetching, SUCCESS or FAILURE.
+ */
+ public void reportToMetric(
+ @Nullable final ProfileEvent event,
+ @Nullable final String sortId,
+ @Nullable final String topic,
+ @NotNull(message = "Must specify fetch result") final SortSdkSourceContext.FetchResult fetchResult) {
+
+ final Map<String, String> dimensions = this.createSortSdkSourceDimensionMap(event, sortId, topic);
+ final SortMetricItem metricItem = metricItemSet.findMetricItem(dimensions);
+ final int msgSize = event != null ? event.getBody().length : -1;
+ this.reportToMetric(metricItem, fetchResult, msgSize);
+ }
+
+ /**
+ * Selector of metric report flow.
+ *
+ * @param item MetricItem that report to.
+ * @param fetchResult Result of fetching, SUCCESS or FAILURE.
+ * @param size The fetch length. -1 means fetch failure.
+ */
+ private void reportToMetric(
+ @NotNull final SortMetricItem item,
+ final FetchResult fetchResult,
+ final int size) {
+
+ switch (fetchResult) {
+ case SUCCESS:
+ reportToMetric(item.readSuccessCount, item.readSuccessSize, size);
+ break;
+ case FAILURE:
+ reportToMetric(item.readFailCount, item.readFailSize, size);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Report to a specific metric group.
+ *
+ * @param countMetric Metric of event count.
+ * @param sizeMetric Metric of event size.
+ * @param size Size of event.
+ */
+ private void reportToMetric(
+ @NotNull final AtomicLong countMetric,
+ @NotNull final AtomicLong sizeMetric,
+ final int size) {
+ countMetric.incrementAndGet();
+ sizeMetric.addAndGet(size);
+ }
+
+ /**
+ * Generator of report dimensions.
+ *
+ * <p> For the case of fetch {@link FetchResult#FAILURE}, the event may be null,
+ * the {@link org.apache.inlong.sort.standalone.utils.Constants#INLONG_GROUP_ID}
+ * and the {@link org.apache.inlong.sort.standalone.utils.Constants#INLONG_STREAM_ID} will not be specified. </p>
+ *
+ * @param event Event to be reported.
+ * @param sortId Sort id of fetched event.
+ * @param topic Topic of event.
+ *
+ * @return The dimensions of reported event.
+ */
+ private Map<String, String> createSortSdkSourceDimensionMap(
+ final ProfileEvent event,
+ final String sortId,
+ final String topic) {
+
+ final Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+ dimensions.put(SortMetricItem.KEY_SOURCE_ID, this.getSourceName());
+ dimensions.put(SortMetricItem.KEY_TASK_NAME, sortId);
+ dimensions.put(SortMetricItem.KEY_SOURCE_DATA_ID, topic);
+ if (event != null) {
+ SortMetricItem.fillInlongId(event, dimensions);
+ }
+ return dimensions;
+ }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SubscribeFetchResult.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SubscribeFetchResult.java
new file mode 100644
index 0000000..638fb07
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SubscribeFetchResult.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sort.standalone.utils.Constants;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * SubscribeFetchResult is the wrapper of {@link MessageRecord}.
+ * <p> SubscribeFetchResult integrate message key, offset and message time in to the header map.</p>
+ */
+public class SubscribeFetchResult {
+
+ // The sortId of fetched message.
+ private final String sortId;
+
+ // Important metrics called headers of {@link MessageRecord}, including message key.
+ private final Map<String, String> headers = new ConcurrentHashMap<>();
+
+ // Row data in binary format.
+ private final byte[] body;
+
+ /**
+ * Private constructor of SubscribeFetchResult.
+ * <p> The construction of SubscribeFetchResult should be initiated by {@link SubscribeFetchResult.Factory}.</p>
+ *
+ * @param sortId The sortId of fetched message.
+ * @param message Message that fetched from upstream data storage.
+ */
+ private SubscribeFetchResult(
+ final String sortId,
+ final MessageRecord message) {
+ this.sortId = sortId;
+ this.headers.put(Constants.HEADER_KEY_MESSAGE_KEY, message.getMsgKey());
+ this.headers.put(Constants.HEADER_KEY_MSG_OFFSET, message.getOffset());
+ this.headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(message.getRecTime()));
+ this.headers.putAll(message.getMsgHeader());
+ this.body = message.getMessage();
+ }
+
+ /**
+ * Get row data that in binary format.
+ * @return Row data.
+ */
+ public byte[] getBody() {
+ return body;
+ }
+
+ /**
+ * Get important metrics in Map format called headers.
+ * @return headers.
+ */
+ public Map<String, String> getHeaders() {
+ return headers;
+ }
+
+ /**
+ * Get sortId of fetched message.
+ * @return SortId of message.
+ */
+ public String getSortId() {
+ return sortId;
+ }
+
+ /**
+ * The factory of {@link SubscribeFetchResult}.
+ */
+ public static class Factory {
+
+ /**
+ * Create one {@link SubscribeFetchResult}.
+ * <p> Validate sortId and message before the construction of SubscribeFetchResult.</p>
+ *
+ * @param sortId The sortId of fetched message.
+ * @param messageRecord Message that fetched from upstream data storage.
+ *
+ * @return One SubscribeFetchResult.
+ */
+ public static SubscribeFetchResult create(
+ @NotBlank(message = "SortId should not be null or empty.") final String sortId,
+ @NotNull(message = "MessageRecord should not be null.") final MessageRecord messageRecord) {
+ return new SubscribeFetchResult(sortId, messageRecord);
+ }
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/test/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceTest.java b/inlong-sort-standalone/sort-standalone-source/src/main/test/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceTest.java
new file mode 100644
index 0000000..63ea6c8
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/test/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import org.apache.flume.Context;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SortClusterConfigHolder.class, LoggerFactory.class, Logger.class, MetricRegister.class})
+public class SortSdkSourceTest {
+
+ private Context mockContext;
+
+ @Before
+ public void setUp() {
+ PowerMockito.mockStatic(LoggerFactory.class);
+ Logger LOG = PowerMockito.mock(Logger.class);
+ PowerMockito.when(LoggerFactory.getLogger(Mockito.any(Class.class))).thenReturn(LOG);
+ PowerMockito.mockStatic(MetricRegister.class);
+
+ PowerMockito.mockStatic(SortClusterConfigHolder.class);
+ SortClusterConfig config = prepareSortClusterConfig(2);
+ PowerMockito.when(SortClusterConfigHolder.getClusterConfig()).thenReturn(config);
+ mockContext = PowerMockito.spy(new Context());
+ }
+
+ @Test
+ public void testRun() {
+ SortSdkSource testSource = new SortSdkSource();
+ testSource.configure(mockContext);
+ testSource.run();
+ testSource.stop();
+ }
+
+ private SortClusterConfig prepareSortClusterConfig(final int size) {
+ final SortClusterConfig testConfig = new SortClusterConfig();
+ testConfig.setClusterName("testConfig");
+ testConfig.setSortTasks(prepareSortTaskConfig(size));
+ return testConfig;
+ }
+
+ private List<SortTaskConfig> prepareSortTaskConfig(final int size) {
+ List<SortTaskConfig> configs = new ArrayList<>();
+
+ for (int i = 0; i < size; i++) {
+ SortTaskConfig config = new SortTaskConfig();
+ config.setName("testConfig" + i);
+ configs.add(config);
+ }
+ Assert.assertEquals(size, configs.size());
+ return configs;
+ }
+
+}
\ No newline at end of file