You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/12/27 06:16:28 UTC

[incubator-inlong] branch master updated: [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar. (#2036)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1517148  [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar. (#2036)
1517148 is described below

commit 15171482ac5c701d097e93fbe2045a4b65653908
Author: imvan <45...@users.noreply.github.com>
AuthorDate: Mon Dec 27 14:16:21 2021 +0800

    [INLONG-1892][Inlong-sort-standalone] Sort-standalone support consume events from Pulsar. (#2036)
---
 .../inlong/sort/standalone/utils/Constants.java    |   3 +-
 .../sort-standalone-source/pom.xml                 |  34 ++++
 .../sink/pulsar/PulsarProducerCluster.java         |   2 +-
 .../sort/standalone/source/SourceContext.java      |  88 ++++++++++
 .../source/sortsdk/DefaultTopicChangeListener.java |  50 ++++++
 .../standalone/source/sortsdk/FetchCallback.java   | 138 +++++++++++++++
 .../standalone/source/sortsdk/SortSdkSource.java   | 193 +++++++++++++++++++++
 .../source/sortsdk/SortSdkSourceContext.java       | 155 +++++++++++++++++
 .../source/sortsdk/SubscribeFetchResult.java       | 105 +++++++++++
 .../source/sortsdk/SortSdkSourceTest.java          |  85 +++++++++
 10 files changed, 851 insertions(+), 2 deletions(-)

diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/Constants.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/Constants.java
index f67cb8a..05b5d42 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/Constants.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/Constants.java
@@ -29,7 +29,8 @@ public interface Constants {
     String HEADER_KEY_MSG_TIME = "msgTime";
     String HEADER_KEY_SOURCE_IP = "sourceIp";
     String HEADER_KEY_SOURCE_TIME = "sourceTime";
-    String MESSAGE_KEY = "messageKey";
+    String HEADER_KEY_MESSAGE_KEY = "messageKey";
+    String HEADER_KEY_MSG_OFFSET = "msgOffset";
     
     String RELOAD_INTERVAL = "reloadInterval";
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/pom.xml b/inlong-sort-standalone/sort-standalone-source/pom.xml
index f128f97..73a25eb 100644
--- a/inlong-sort-standalone/sort-standalone-source/pom.xml
+++ b/inlong-sort-standalone/sort-standalone-source/pom.xml
@@ -42,5 +42,39 @@
             <artifactId>sort-standalone-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>jakarta.validation</groupId>
+            <artifactId>jakarta.validation-api</artifactId>
+            <version>2.0.2</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <artifactId>powermock-module-junit4</artifactId>
+            <groupId>org.powermock</groupId>
+            <scope>test</scope>
+            <version>2.0.2</version>
+        </dependency>
+
+        <dependency>
+            <artifactId>powermock-api-mockito2</artifactId>
+            <groupId>org.powermock</groupId>
+            <scope>test</scope>
+            <version>2.0.2</version>
+        </dependency>
+
+        <dependency>
+            <artifactId>mockito-core</artifactId>
+            <groupId>org.mockito</groupId>
+            <scope>test</scope>
+            <version>2.23.0</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
index 355abf1..993e97f 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
@@ -226,7 +226,7 @@ public class PulsarProducerCluster implements LifecycleAware {
             this.addMetric(event, topic, false, 0);
             return false;
         }
-        String messageKey = headers.get(Constants.MESSAGE_KEY);
+        String messageKey = headers.get(Constants.HEADER_KEY_MESSAGE_KEY);
         if (messageKey == null) {
             messageKey = headers.get(Constants.HEADER_KEY_SOURCE_IP);
         }
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/SourceContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/SourceContext.java
new file mode 100644
index 0000000..af93c8d
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/SourceContext.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source;
+
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Base source context <b>WITHOUT</b> metric reporter.
+ * The derived classes of SourceContext may implement {@link org.apache.inlong.commons.config.metrics.MetricItem} and
+ * realize methods to report customized metrics.
+ */
+public class SourceContext {
+
+    // The key of reload interval of source.
+    private static final String KEY_RELOAD_INTERVAL = "reloadInterval";
+
+    // the default reload interval value in ms.
+    private static final long DEFAULT_RELOAD_INTERVAL_MS = 6000L;
+
+    // The configured source context.
+    private final Context sourceContext;
+
+    // Name of source. Usually the class name of source.
+    private final String sourceName;
+
+    // Cluster Id of source.
+    @NotNull
+    private final String clusterId;
+
+    /**
+     * Constructor of {@link SourceContext}.
+     *
+     * @param sourceName Name of source. Usually the class name of source.
+     * @param context The configured source context.
+     */
+    public SourceContext(
+            @NotBlank(message = "sourceName should not be empty or null") final String sourceName,
+            @NotNull(message = "context should not be null") final Context context) {
+
+        this.sourceName = sourceName;
+        this.sourceContext = context;
+        this.clusterId = context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
+    }
+
+    /**
+     * Obtain the reload interval of source.
+     * @return Reload interval of source.
+     */
+    public final long getReloadInterval() {
+        return sourceContext.getLong(SourceContext.KEY_RELOAD_INTERVAL, DEFAULT_RELOAD_INTERVAL_MS);
+    }
+
+    /**
+     * Obtain the cluster Id of source.
+     * @return Cluster Id of source.
+     */
+    public final String getClusterId() {
+        return clusterId;
+    }
+
+    /**
+     * Obtain the name of source.
+     * @return Name of source.
+     */
+    public final String getSourceName() {
+        return sourceName;
+    }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/DefaultTopicChangeListener.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/DefaultTopicChangeListener.java
new file mode 100644
index 0000000..f74b94e
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/DefaultTopicChangeListener.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import org.apache.inlong.sdk.sort.api.InLongTopicChangeListener;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+
+import java.util.Set;
+
+/**
+ * Default implementation of {@link InLongTopicChangeListener}.
+ *
+ * Do nothing when the topic assignments change.
+ *
+ */
+public class DefaultTopicChangeListener implements InLongTopicChangeListener {
+
+    @Override
+    public boolean onAssignmentsChange(
+            Set<InLongTopic> allInLongTopics,
+            Set<InLongTopic> newInLongTopics,
+            Set<InLongTopic> removedInLongTopics) {
+        return true;
+    }
+
+    @Override
+    public boolean requestAck(String sortTaskId) {
+        return true;
+    }
+
+    @Override
+    public String offlineAndGetAckOffset(String msgKey) {
+        return "";
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
new file mode 100644
index 0000000..87ac897
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.inlong.sdk.sort.api.ReadCallback;
+import org.apache.inlong.sdk.sort.api.SortClient;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Implementation of {@link ReadCallback}.
+ *
+ * TODO: Sort sdk should deliver one object which is held by {@link ProfileEvent} and used to ack upstream data store
+ * The code should be like :
+ *
+ *        public void onFinished(final MessageRecord messageRecord, ACKer acker) {
+ *            doSomething();
+ *            final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders(), acker);
+ *             channelProcessor.processEvent(profileEvent);
+ *        }
+ *
+ * The ACKer will be used to <b>ACK</b> upstream after that the downstream <b>ACKed</b> sort-standalone.
+ * This process seems like <b>transaction</b> of the whole sort-standalone, and which
+ * ensure <b>At Least One</b> semantics.
+ */
+public class FetchCallback implements ReadCallback {
+
+    // Logger of {@link FetchCallback}.
+    private static final Logger LOG = LoggerFactory.getLogger(FetchCallback.class);
+
+    // SortId of fetch message.
+    private final String sortId;
+
+    // ChannelProcessor that put message in specific channel.
+    private final ChannelProcessor channelProcessor;
+
+    // Context of source, used to report fetch results.
+    private final SortSdkSourceContext context;
+
+    // Temporary usage for ACK. The {@link SortClient} and Callback should not circular reference each other.
+    private SortClient client;
+
+    /**
+     * Private constructor of {@link FetchCallback}.
+     * <p> The construction of FetchCallback should be initiated by {@link FetchCallback.Factory}.</p>
+     *
+     * @param sortId SortId of fetch message.
+     * @param channelProcessor ChannelProcessor that message put in.
+     * @param context The context to report fetch results.
+     */
+    private FetchCallback(
+            final String sortId,
+            final ChannelProcessor channelProcessor,
+            final SortSdkSourceContext context) {
+        this.sortId = sortId;
+        this.channelProcessor = channelProcessor;
+        this.context = context;
+    }
+
+    /**
+     * Set client for ack.
+     * @param client client for ack.
+     */
+    public void setClient(@NotNull SortClient client) {
+        this.client = client;
+    }
+
+    /**
+     * The callback function that SortSDK invoke when fetch messages.
+     *
+     * <p> In order to ACK the fetched msg, {@link FetchCallback} has to hold the {@link SortClient} which results in
+     * <b>Cycle Reference</b>. The {@link SortClient} should deliver one object to ACK after invoke the callback method
+     * {@link ReadCallback#onFinished(MessageRecord)}. </p>
+     *
+     * @param messageRecord message
+     */
+    @Override
+    public void onFinished(final MessageRecord messageRecord) {
+        try {
+            Preconditions.checkState(messageRecord != null, "Fetched msg is null.");
+            final SubscribeFetchResult result = SubscribeFetchResult.Factory.create(sortId, messageRecord);
+            final ProfileEvent profileEvent = new ProfileEvent(result.getBody(), result.getHeaders());
+            channelProcessor.processEvent(profileEvent);
+            context.reportToMetric(profileEvent, sortId, "-", SortSdkSourceContext.FetchResult.SUCCESS);
+            client.ack(messageRecord.getMsgKey(), messageRecord.getMsgKey());
+        } catch (NullPointerException npe) {
+            LOG.error("Got a null pointer exception for sortId " + sortId, npe);
+            context.reportToMetric(null, sortId, "-", SortSdkSourceContext.FetchResult.FAILURE);
+        } catch (Exception e) {
+            LOG.error("Ack failed for sortId " + sortId, e);
+        }
+    }
+
+    /**
+     * Factory of {@link FetchCallback}
+     */
+    public static class Factory {
+
+        /**
+         * Create one {@link FetchCallback}.
+         * <p> Validate sortId and channelProcessor before the construction of FetchCallback.</p>
+         *
+         * @param sortId The sortId of fetched message.
+         * @param channelProcessor The channelProcessor that put message in specific channel.
+         * @param context The context to report fetch results.
+         *
+         * @return One FetchCallback.
+         */
+        public static FetchCallback create(
+                @NotBlank(message = "sortId should not be null or empty.") final String sortId,
+                @NotNull(message = "channelProcessor should not be null.") final ChannelProcessor channelProcessor,
+                @NotNull(message = "context should not be null") final SortSdkSourceContext context) {
+            return new FetchCallback(sortId, channelProcessor, context);
+        }
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
new file mode 100644
index 0000000..abad948
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.Context;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.sdk.sort.api.SortClient;
+import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.api.SortClientFactory;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Default Source implementation of InLong.
+ *
+ * <p> SortSdkSource acquired msg from different upstream data store by register {@link SortClient} for each
+ * sort task. The only things SortSdkSource should do is to get one client by the sort task id, or remove one client
+ * when the task is finished or schedule to other source instance. </p>
+ *
+ * <p> The Default Manager of InLong will schedule the partition and topic automatically. </p>
+ *
+ * <p> Because all sources should implement {@link Configurable}, the SortSdkSource should have
+ * default constructor <b>WITHOUT</b> any arguments, and parameters will be configured by
+ * {@link Configurable#configure(Context)}. </p>
+ */
+public final class SortSdkSource extends AbstractSource implements Configurable, Runnable, EventDrivenSource {
+
+    // Log of {@link SortSdkSource}.
+    private static final Logger LOG = LoggerFactory.getLogger(SortSdkSource.class);
+
+    // Default pool of {@link ScheduledExecutorService}.
+    private static final int CORE_POOL_SIZE = 1;
+
+    // Default consume strategy of {@link SortClient}.
+    private static final  SortClientConfig.ConsumeStrategy defaultStrategy = SortClientConfig.ConsumeStrategy.lastest;
+
+    // Map of {@link SortClient}.
+    private Map<String, SortClient> clients;
+
+    // The cluster name of sort.
+    private String sortClusterName;
+
+    // Reload config interval.
+    private long reloadInterval;
+
+    // Context of SortSdkSource.
+    private SortSdkSourceContext context;
+
+    // Executor for config reloading.
+    private ScheduledExecutorService pool;
+
+    /**
+     * Start SortSdkSource.
+     */
+    @Override
+    public synchronized void start() {
+        this.reload();
+    }
+
+    /**
+     * Stop {@link #pool} and close all {@link SortClient}.
+     */
+    @Override
+    public void stop() {
+        pool.shutdownNow();
+        clients.forEach((sortId, client) -> client.close());
+    }
+
+    /**
+     * Entrance of {@link #pool} to reload clients with fix rate {@link #reloadInterval}.
+     */
+    @Override
+    public void run() {
+        this.reload();
+    }
+
+    /**
+     * Configure parameters.
+     *
+     * @param context Context of source.
+     */
+    @Override
+    public void configure(Context context) {
+        this.clients = new ConcurrentHashMap<>();
+        this.sortClusterName = SortClusterConfigHolder.getClusterConfig().getClusterName();
+        Preconditions.checkState(context != null, "No context, configure failed");
+        this.context = new SortSdkSourceContext(getName(), context);
+        this.reloadInterval = this.context.getReloadInterval();
+        this.initReloadExecutor();
+    }
+
+    /**
+     * Init ScheduledExecutorService with fix reload rate {@link #reloadInterval}.
+     */
+    private void initReloadExecutor() {
+        this.pool = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
+        pool.scheduleAtFixedRate(this, reloadInterval, reloadInterval, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Reload clients by current {@link SortTaskConfig}.
+     *
+     * <p> Create new clients with new sort task id, and remove the finished or scheduled ones. </p>
+     *
+     * <p> Current version of SortSdk <b>DO NOT</b> support to get the corresponding sort id of {@link SortClient}.
+     *  Hence, the maintenance of mapping of <SortId, SortClient> should be done by Source itself. Which is not elegant,
+     *  the <b>REMOVE</b> of expire clients will <b>NOT</b> be supported right now. </p>
+     */
+    private void reload() {
+
+        final List<SortTaskConfig> configs = SortClusterConfigHolder.getClusterConfig().getSortTasks();
+        LOG.info("start to reload SortSdkSource");
+
+        // Start new clients
+        for (SortTaskConfig taskConfig : configs) {
+
+            // If exits, skip.
+            final String sortId = taskConfig.getName();
+            SortClient client = this.clients.get(sortId);
+            if (client != null) {
+                continue;
+            }
+
+            // Otherwise, new one client.
+            client = this.newClient(sortId);
+            if (client != null) {
+                this.clients.put(sortId, client);
+            }
+        }
+    }
+
+    /**
+     * Create one {@link SortClient} with specific sort id.
+     *
+     * <p> In current version, the {@link FetchCallback} will hold the client to ACK.
+     * For more details see {@link FetchCallback#onFinished(MessageRecord)}</p>
+     *
+     * @param sortId Sort in of new client.
+     *
+     * @return New sort client.
+     */
+    private SortClient newClient(final String sortId) {
+        LOG.info("Start to new sort client for id: {}", sortId);
+        try {
+            final SortClientConfig clientConfig =
+                    new SortClientConfig(sortId, this.sortClusterName, new DefaultTopicChangeListener(),
+                            SortSdkSource.defaultStrategy, InetAddress.getLocalHost().getHostAddress());
+            final FetchCallback callback = FetchCallback.Factory.create(sortId, getChannelProcessor(), context);
+            clientConfig.setCallback(callback);
+            SortClient client = SortClientFactory.createSortClient(clientConfig);
+            client.init();
+            // temporary use to ACK fetched msg.
+            callback.setClient(client);
+            return client;
+        } catch (UnknownHostException ex) {
+            LOG.error("Got one UnknownHostException when init client of id: " + sortId, ex);
+        } catch (Throwable th) {
+            LOG.error("Got one throwable when init client of id: " + sortId, th);
+        }
+        return null;
+    }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
new file mode 100644
index 0000000..1ec37bd
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceContext.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import org.apache.flume.Context;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
+import org.apache.inlong.sort.standalone.source.SourceContext;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Derived class of {@link SourceContext} which implements methods to report metrics.
+ */
+public final class SortSdkSourceContext extends SourceContext {
+
+    // Metric item set of Sort to create and maintain specific metric group.
+    private final SortMetricItemSet metricItemSet;
+
+    /**
+     * Type of metrics to report.
+     *
+     * <p> For {@link SortSdkSource}, there are only two types of fetch results, success or failure.</p>
+     */
+    public enum FetchResult {
+        SUCCESS,
+        FAILURE
+    }
+
+    /**
+     * Constructor of {@link SourceContext}.
+     *
+     * @param sourceName Name of source. Usually the class name of source.
+     * @param context The configured source context.
+     */
+    public SortSdkSourceContext(
+            @NotBlank(message = "sourceName should not be empty or null") final String sourceName,
+            @NotNull(message = "context should not be null") final Context context) {
+
+        super(sourceName, context);
+        this.metricItemSet = new SortMetricItemSet(sourceName);
+        MetricRegister.register(metricItemSet);
+    }
+
+    /**
+     * Entrance to report fetch metrics.
+     *
+     * @param event The fetched event. May be <b>null</b> when fetch failed occurs.
+     * @param sortId Sort id of event.
+     * @param topic Topic that event fetched from. May be <b>null</b> when fetch failed occurs.
+     * @param fetchResult Result of fetching, SUCCESS or FAILURE.
+     */
+    public void reportToMetric(
+            @Nullable final ProfileEvent event,
+            @Nullable final String sortId,
+            @Nullable final String topic,
+            @NotNull(message = "Must specify fetch result") final SortSdkSourceContext.FetchResult fetchResult) {
+
+        final Map<String, String> dimensions = this.createSortSdkSourceDimensionMap(event, sortId, topic);
+        final SortMetricItem metricItem = metricItemSet.findMetricItem(dimensions);
+        final int msgSize = event != null ? event.getBody().length : -1;
+        this.reportToMetric(metricItem, fetchResult, msgSize);
+    }
+
+    /**
+     * Selector of metric report flow.
+     *
+     * @param item MetricItem that report to.
+     * @param fetchResult Result of fetching, SUCCESS or FAILURE.
+     * @param size The fetch length. -1 means fetch failure.
+     */
+    private void reportToMetric(
+            @NotNull final SortMetricItem item,
+            final FetchResult fetchResult,
+            final int size) {
+
+        switch (fetchResult) {
+            case SUCCESS:
+                reportToMetric(item.readSuccessCount, item.readSuccessSize, size);
+                break;
+            case FAILURE:
+                reportToMetric(item.readFailCount, item.readFailSize, size);
+                break;
+            default:
+                break;
+        }
+    }
+
+    /**
+     * Report to a specific metric group.
+     *
+     * @param countMetric Metric of event count.
+     * @param sizeMetric Metric of event size.
+     * @param size Size of event.
+     */
+    private void reportToMetric(
+            @NotNull final AtomicLong countMetric,
+            @NotNull final AtomicLong sizeMetric,
+            final int size) {
+        countMetric.incrementAndGet();
+        sizeMetric.addAndGet(size);
+    }
+
+    /**
+     * Generator of report dimensions.
+     *
+     * <p> For the case of fetch {@link FetchResult#FAILURE}, the event may be null,
+     * the {@link org.apache.inlong.sort.standalone.utils.Constants#INLONG_GROUP_ID}
+     * and the {@link org.apache.inlong.sort.standalone.utils.Constants#INLONG_STREAM_ID} will not be specified. </p>
+     *
+     * @param event Event to be reported.
+     * @param sortId Sort id of fetched event.
+     * @param topic Topic of event.
+     *
+     * @return The dimensions of reported event.
+     */
+    private Map<String, String> createSortSdkSourceDimensionMap(
+            final ProfileEvent event,
+            final String sortId,
+            final String topic) {
+
+        final Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
+        dimensions.put(SortMetricItem.KEY_SOURCE_ID, this.getSourceName());
+        dimensions.put(SortMetricItem.KEY_TASK_NAME, sortId);
+        dimensions.put(SortMetricItem.KEY_SOURCE_DATA_ID, topic);
+        if (event != null) {
+            SortMetricItem.fillInlongId(event, dimensions);
+        }
+        return dimensions;
+    }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SubscribeFetchResult.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SubscribeFetchResult.java
new file mode 100644
index 0000000..638fb07
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SubscribeFetchResult.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sort.standalone.utils.Constants;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * SubscribeFetchResult is the wrapper of {@link MessageRecord}.
+ * <p> SubscribeFetchResult integrate message key, offset and message time in to the header map.</p>
+ */
+public class SubscribeFetchResult {
+
+    // The sortId of fetched message.
+    private final String sortId;
+
+    // Important metrics called headers of {@link MessageRecord}, including message key.
+    private final Map<String, String> headers = new ConcurrentHashMap<>();
+
+    // Row data in binary format.
+    private final byte[] body;
+
+    /**
+     * Private constructor of SubscribeFetchResult.
+     * <p> The construction of SubscribeFetchResult should be initiated by {@link SubscribeFetchResult.Factory}.</p>
+     *
+     * @param sortId The sortId of fetched message.
+     * @param message Message that fetched from upstream data storage.
+     */
+    private SubscribeFetchResult(
+            final String sortId,
+            final MessageRecord message) {
+        this.sortId = sortId;
+        this.headers.put(Constants.HEADER_KEY_MESSAGE_KEY, message.getMsgKey());
+        this.headers.put(Constants.HEADER_KEY_MSG_OFFSET, message.getOffset());
+        this.headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(message.getRecTime()));
+        this.headers.putAll(message.getMsgHeader());
+        this.body = message.getMessage();
+    }
+
+    /**
+     * Get row data that in binary format.
+     * @return Row data.
+     */
+    public byte[] getBody() {
+        return body;
+    }
+
+    /**
+     * Get important metrics in Map format called headers.
+     * @return headers.
+     */
+    public Map<String, String> getHeaders() {
+        return headers;
+    }
+
+    /**
+     * Get sortId of fetched message.
+     * @return SortId of message.
+     */
+    public String getSortId() {
+        return sortId;
+    }
+
+    /**
+     * The factory of {@link SubscribeFetchResult}.
+     */
+    public static class Factory {
+
+        /**
+         * Create one {@link SubscribeFetchResult}.
+         * <p> Validate sortId and message before the construction of SubscribeFetchResult.</p>
+         *
+         * @param sortId The sortId of fetched message.
+         * @param messageRecord Message that fetched from upstream data storage.
+         *
+         * @return One SubscribeFetchResult.
+         */
+        public static SubscribeFetchResult create(
+                @NotBlank(message = "SortId should not be null or empty.") final String sortId,
+                @NotNull(message = "MessageRecord should not be null.") final MessageRecord messageRecord) {
+            return new SubscribeFetchResult(sortId, messageRecord);
+        }
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/test/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceTest.java b/inlong-sort-standalone/sort-standalone-source/src/main/test/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceTest.java
new file mode 100644
index 0000000..63ea6c8
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/test/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSourceTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.source.sortsdk;
+
+import org.apache.flume.Context;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SortClusterConfigHolder.class, LoggerFactory.class, Logger.class, MetricRegister.class})
+public class SortSdkSourceTest {
+
+    private Context mockContext;
+
+    @Before
+    public void setUp() {
+        PowerMockito.mockStatic(LoggerFactory.class);
+        Logger LOG = PowerMockito.mock(Logger.class);
+        PowerMockito.when(LoggerFactory.getLogger(Mockito.any(Class.class))).thenReturn(LOG);
+        PowerMockito.mockStatic(MetricRegister.class);
+
+        PowerMockito.mockStatic(SortClusterConfigHolder.class);
+        SortClusterConfig config = prepareSortClusterConfig(2);
+        PowerMockito.when(SortClusterConfigHolder.getClusterConfig()).thenReturn(config);
+        mockContext = PowerMockito.spy(new Context());
+    }
+
+    @Test
+    public void testRun() {
+        SortSdkSource testSource = new SortSdkSource();
+        testSource.configure(mockContext);
+        testSource.run();
+        testSource.stop();
+    }
+
+    private SortClusterConfig prepareSortClusterConfig(final int size) {
+        final SortClusterConfig testConfig = new SortClusterConfig();
+        testConfig.setClusterName("testConfig");
+        testConfig.setSortTasks(prepareSortTaskConfig(size));
+        return testConfig;
+    }
+
+    private List<SortTaskConfig> prepareSortTaskConfig(final int size) {
+        List<SortTaskConfig> configs = new ArrayList<>();
+
+        for (int i = 0; i < size; i++) {
+            SortTaskConfig config = new SortTaskConfig();
+            config.setName("testConfig" + i);
+            configs.add(config);
+        }
+        Assert.assertEquals(size, configs.size());
+        return configs;
+    }
+
+}
\ No newline at end of file