You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by zh...@apache.org on 2022/04/19 02:29:21 UTC

[kylin] 01/03: Support soft affinity and local cache feature

This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch kylin-soft-affinity-local-cache
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7142f768c405a703b69abd15e3756b41d7d5d978
Author: Zhichao Zhang <zh...@apache.org>
AuthorDate: Thu Sep 16 09:03:53 2021 +0800

    Support soft affinity and local cache feature
    
        1. Implement LocalDataCacheManager
        2. base xiaoxiang's PR
        3. Implement CacheFileScanRDD
        4. Implement AbstractCacheFileSystem
        5. Optimize performance
        6. Support soft affinity for hdfs
        7. Support ByteBuffer to read data, and avoid to read data one byte by one byte
        8. Support to cache small files in memory : ByteBufferPageStore extends PageStore to support cache data in memory
---
 .../test/resources/query/test_query/query01.sql    |  30 ++
 .../pom.xml                                        |  44 +--
 .../apache/kylin/cache/KylinCacheConstants.java    |  29 ++
 .../kylin/cache/fs/AbstractCacheFileSystem.java    | 330 +++++++++++++++++++++
 .../kylin/cache/fs/AlluxioHdfsFileInputStream.java | 223 ++++++++++++++
 .../kylin/cache/fs/CacheFileInputStream.java       | 312 +++++++++++++++++++
 .../kylin/cache/fs/CacheFileSystemConstants.java   |  54 ++++
 .../kylin/cache/fs/MemCacheFileInputStream.java    | 216 ++++++++++++++
 .../kylin/cache/fs/OnlyForTestCacheFileSystem.java |  30 ++
 .../kylin/cache/fs/kylin/CacheAllFileSystem.java   |  32 ++
 .../kylin/cache/fs/kylin/KylinCacheFileSystem.java |  47 +++
 .../apache/kylin/cache/utils/ConsistentHash.java   |  87 ++++++
 .../apache/kylin/cache/utils/ReflectionUtil.java   | 168 +++++++++++
 .../kylin/softaffinity/SoftAffinityConstants.java  |  41 +++
 .../kylin/softaffinity/SoftAffinityManager.scala   | 146 +++++++++
 .../scheduler/SoftAffinityListener.scala           |  38 +++
 .../strategy/SoftAffinityAllocationTrait.scala     |  38 +++
 .../strategy/SoftAffinityStrategy.scala            |  55 ++++
 .../execution/datasources/CacheFilePartition.scala |  72 +++++
 .../execution/datasources/CacheFileScanRDD.scala   |  77 +++++
 kylin-spark-project/kylin-spark-common/pom.xml     |   5 +
 .../common/logging/AbstractHdfsLogAppender.java    |   3 +-
 .../common/logging/SparkDriverHdfsLogAppender.java |   3 +-
 .../common/logging/SparkExecutorHdfsAppender.java  |   4 +-
 .../org/apache/spark/utils/SparkHadoopUtils.scala  |  30 ++
 .../sql/execution/KylinFileSourceScanExec.scala    |  19 +-
 kylin-spark-project/pom.xml                        |   1 +
 27 files changed, 2107 insertions(+), 27 deletions(-)

diff --git a/kylin-it/src/test/resources/query/test_query/query01.sql b/kylin-it/src/test/resources/query/test_query/query01.sql
new file mode 100644
index 0000000000..d4dec60406
--- /dev/null
+++ b/kylin-it/src/test/resources/query/test_query/query01.sql
@@ -0,0 +1,30 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT test_kylin_fact.cal_dt,cast(timestampdiff(DAY,date'2013-01-01',test_kylin_fact.cal_dt) as integer) as x,sum(price) as y
+ FROM TEST_KYLIN_FACT 
+ 
+inner JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ inner JOIN test_category_groupings
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id
+ inner JOIN edw.test_sites as test_sites
+ ON test_kylin_fact.lstg_site_id = test_sites.site_id
+ GROUP BY test_kylin_fact.cal_dt
+ ORDER BY test_kylin_fact.cal_dt
+;{"scanRowCount":1462,"scanBytes":215217,"scanFiles":2,"cuboidId":262144}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-common/pom.xml b/kylin-spark-project/kylin-soft-affinity-cache/pom.xml
similarity index 80%
copy from kylin-spark-project/kylin-spark-common/pom.xml
copy to kylin-spark-project/kylin-soft-affinity-cache/pom.xml
index 4d0cee3851..87370db8d4 100644
--- a/kylin-spark-project/kylin-spark-common/pom.xml
+++ b/kylin-spark-project/kylin-soft-affinity-cache/pom.xml
@@ -20,37 +20,39 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
-    <name>Apache Kylin 4.X - Common</name>
+    <artifactId>kylin-soft-affinity-cache</artifactId>
     <packaging>jar</packaging>
-    <artifactId>kylin-spark-common</artifactId>
-    <version>4.0.2-SNAPSHOT</version>
+    <name>Apache Kylin 4.X - Soft Affinity and Cache</name>
 
     <parent>
-        <artifactId>kylin-spark-project</artifactId>
         <groupId>org.apache.kylin</groupId>
-        <version>4.0.2-SNAPSHOT</version>
+        <artifactId>kylin-spark-project</artifactId>
+        <version>4.0.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
 
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <alluxio.version>2.6.1</alluxio.version>
+    </properties>
+
     <dependencies>
         <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-metadata</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-common</artifactId>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-spark-metadata</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-api</artifactId>
+            <groupId>org.alluxio</groupId>
+            <artifactId>alluxio-shaded-client</artifactId>
+            <version>${alluxio.version}</version>
             <scope>provided</scope>
         </dependency>
     </dependencies>
@@ -110,4 +112,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/KylinCacheConstants.java b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/KylinCacheConstants.java
new file mode 100644
index 0000000000..b1232aee0f
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/KylinCacheConstants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache;
+
+public class KylinCacheConstants {
+
+    private KylinCacheConstants() {
+    }
+
+    // Todo: change the param key name
+    public static final String KYLIN_CACHE_FS =
+            "org.apache.kylin.cache.fs.kylin.KylinCacheFileSystem";
+}
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/AbstractCacheFileSystem.java b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/AbstractCacheFileSystem.java
new file mode 100644
index 0000000000..7a713194d7
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/AbstractCacheFileSystem.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.fs;
+
+import alluxio.client.file.CacheContext;
+import alluxio.client.file.URIStatus;
+import alluxio.client.file.cache.CacheManager;
+import alluxio.client.file.cache.LocalCacheFileInStream;
+import alluxio.conf.AlluxioConfiguration;
+import alluxio.hadoop.AlluxioHdfsInputStream;
+import alluxio.hadoop.HadoopFileOpener;
+import alluxio.hadoop.HadoopUtils;
+import alluxio.metrics.MetricsConfig;
+import alluxio.metrics.MetricsSystem;
+import alluxio.wire.FileInfo;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+//import org.apache.hadoop.util.DirectBufferPool;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.cache.utils.ReflectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.hash.Hashing.md5;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public abstract class AbstractCacheFileSystem extends FilterFileSystem {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractCacheFileSystem.class);
+
+    //private static final DirectBufferPool bufferPool = new DirectBufferPool();
+
+    protected URI uri;
+    protected String originalScheme;
+    protected int bufferSize = 4096;
+    protected boolean useLocalCache = false;
+    protected boolean useLegacyFileInputStream = false;
+    protected HadoopFileOpener mHadoopFileOpener;
+    protected LocalCacheFileInStream.FileInStreamOpener mAlluxioFileOpener;
+    protected CacheManager mCacheManager;
+    protected AlluxioConfiguration mAlluxioConf;
+
+    protected LoadingCache<Path, FileStatus> fileStatusCache;
+
+    // put("s3", "com.amazon.ws.emr.hadoop.fs.EmrFileSystem");
+    // put("s3n", "com.amazon.ws.emr.hadoop.fs.EmrFileSystem");
+    // put("s3bfs", "org.apache.hadoop.fs.s3.S3FileSystem");
+    protected static final Map<String, String> schemeClassMap = new HashMap<String, String>() {
+        {
+            put("file", "org.apache.hadoop.fs.LocalFileSystem");
+            put("viewfs", "org.apache.hadoop.fs.viewfs.ViewFileSystem");
+            put("s3a", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+            put("s3", "org.apache.hadoop.fs.s3.S3FileSystem");
+            put("s3n", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
+            put("hdfs", "org.apache.hadoop.hdfs.DistributedFileSystem");
+            put("wasb", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
+            put("wasbs", "org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure");
+            put("jfs", "io.juicefs.JuiceFileSystem");
+            put("alluxio", "alluxio.hadoop.FileSystem");
+        }
+    };
+
+    /**
+     * Create internal FileSystem
+     */
+    protected static FileSystem createInternalFS(URI uri, Configuration conf)
+            throws IOException {
+        if (!schemeClassMap.containsKey(uri.getScheme())) {
+            throw new IOException("No FileSystem for scheme: " + uri.getScheme());
+        }
+        FileSystem fs = null;
+        try {
+            Class<? extends FileSystem> clazz =
+                    (Class<? extends FileSystem>) conf.getClassByName(
+                            schemeClassMap.get(uri.getScheme()));
+            fs = ReflectionUtils.newInstance(clazz, conf);
+            fs.initialize(uri, conf);
+            LOG.info("Create filesystem {} for scheme {} .",
+                    schemeClassMap.get(uri.getScheme()), uri.getScheme());
+        } catch (ClassNotFoundException e) {
+            throw new IOException("Can not found FileSystem Clazz for scheme: " + uri.getScheme());
+        }
+        return fs;
+    }
+
+    protected void createLocalCacheManager(URI name, Configuration conf) throws IOException{
+        mHadoopFileOpener = uriStatus -> this.fs.open(new Path(uriStatus.getPath()));
+        mAlluxioFileOpener = status -> new AlluxioHdfsInputStream(mHadoopFileOpener.open(status));
+
+        mAlluxioConf = HadoopUtils.toAlluxioConf(conf);
+        // Handle metrics
+        Properties metricsProperties = new Properties();
+        for (Map.Entry<String, String> entry : conf) {
+            metricsProperties.setProperty(entry.getKey(), entry.getValue());
+        }
+        MetricsSystem.startSinksFromConfig(new MetricsConfig(metricsProperties));
+        mCacheManager = CacheManager.Factory.get(mAlluxioConf);
+        if (mCacheManager == null) {
+            throw new IOException("CacheManager is null !");
+        }
+    }
+
+    @Override
+    public synchronized void initialize(URI name, Configuration conf) throws IOException {
+        this.originalScheme = name.getScheme();
+        // create internal FileSystem according to the scheme
+        this.fs = createInternalFS(name, conf);
+        this.statistics = (FileSystem.Statistics) ReflectionUtil.getFieldValue(this.fs,
+                "statistics");
+        if (null == this.statistics) {
+            LOG.info("======= original statistics is null.");
+        } else {
+            LOG.info("======= original statistics is {} {}.", this.statistics.getScheme(),
+                    this.statistics.toString());
+        }
+        super.initialize(name, conf);
+        this.setConf(conf);
+        LOG.info("======= current statistics is {} {}.", this.statistics.getScheme(),
+                this.statistics.toString());
+
+        this.bufferSize = conf.getInt(CacheFileSystemConstants.PARAMS_KEY_IO_FILE_BUFFER_SIZE,
+                CacheFileSystemConstants.PARAMS_KEY_IO_FILE_BUFFER_SIZE_DEFAULT_VALUE);
+        // when scheme is jfs, use the cache by jfs itself
+        this.useLocalCache = conf.getBoolean(CacheFileSystemConstants.PARAMS_KEY_USE_CACHE,
+                CacheFileSystemConstants.PARAMS_KEY_USE_CACHE_DEFAULT_VALUE)
+                && !this.originalScheme.equals(CacheFileSystemConstants.JUICEFS_SCHEME);
+
+        this.useLegacyFileInputStream = conf.getBoolean(
+                CacheFileSystemConstants.PARAMS_KEY_USE_LEGACY_FILE_INPUTSTREAM,
+                CacheFileSystemConstants.PARAMS_KEY_USE_LEGACY_FILE_INPUTSTREAM_DEFAULT_VALUE);
+
+        // create FileStatus cache
+        long fileStatusTTL =
+                conf.getLong(CacheFileSystemConstants.PARAMS_KEY_FILE_STATUS_CACHE_TTL,
+                        CacheFileSystemConstants.PARAMS_KEY_FILE_STATUS_CACHE_TTL_DEFAULT_VALUE);
+        long fileStatusMaxSize =
+                conf.getLong(CacheFileSystemConstants.PARAMS_KEY_FILE_STATUS_CACHE_MAX_SIZE,
+                     CacheFileSystemConstants.PARAMS_KEY_FILE_STATUS_CACHE_MAX_SIZE_DEFAULT_VALUE);
+        CacheLoader<Path, FileStatus> fileStatusCacheLoader = new CacheLoader<Path, FileStatus>() {
+            @Override
+            public FileStatus load(Path path) throws Exception {
+                return getFileStatusForCache(path);
+            }
+        };
+        this.fileStatusCache =
+                CacheBuilder.newBuilder()
+                        .maximumSize(fileStatusMaxSize)
+                        .expireAfterAccess(fileStatusTTL, TimeUnit.SECONDS)
+                        .recordStats()
+                        .build(fileStatusCacheLoader);
+
+        // create LocalCacheFileSystem if needs
+        if (this.isUseLocalCache()) {
+            // Todo: Can set local cache dir here for the current executor
+            this.createLocalCacheManager(this.getUri(), conf);
+            LOG.info("Create LocalCacheFileSystem successfully .");
+        }
+    }
+
+    protected FileStatus getFileStatusForCache(Path path) throws IOException {
+        return this.fs.getFileStatus(path);
+    }
+
+    @Override
+    public String getScheme() {
+        return this.originalScheme;
+    }
+
+    @Override
+    public FSDataInputStream open(Path f) throws IOException {
+        return open(f, bufferSize);
+    }
+
+    /**
+     * Check whether needs to cache data on the current executor
+     */
+    public abstract boolean isUseLocalCacheForTargetExecs();
+
+    /**
+     * Wrap FileStatus to Alluxio FileInfo
+     */
+    public FileInfo wrapFileInfo(FileStatus fileStatus) {
+        return (new FileInfo()
+                .setLength(fileStatus.getLen())
+                .setPath(fileStatus.getPath().toString())
+                .setFolder(fileStatus.isDirectory())
+                .setBlockSizeBytes(fileStatus.getBlockSize())
+                .setLastModificationTimeMs(fileStatus.getModificationTime())
+                .setLastAccessTimeMs(fileStatus.getAccessTime())
+                .setOwner(fileStatus.getOwner())
+                .setGroup(fileStatus.getGroup()));
+    }
+
+    private int checkBufferSize(int size) {
+        if (size < this.bufferSize) {
+            size = this.bufferSize;
+        }
+        //  int numWords = (size + 7) / 8;
+        //  int alignedSize = numWords * 8;
+        //  assert (alignedSize >= size);
+        //  return alignedSize;
+        return size;
+    }
+
+    @Override
+    public FSDataInputStream open(Path p, int bufferSize) throws IOException {
+        return this.open(p, bufferSize, this.isUseLocalCacheForTargetExecs());
+    }
+
+    public FSDataInputStream open(Path p, int bufferSize, boolean useLocalCacheForExec) throws IOException {
+        Path f = this.fs.makeQualified(p);
+
+        if (this.isUseLocalCache() && this.mCacheManager != null && useLocalCacheForExec) {
+            FileStatus fileStatus = this.getFileStatus(f);
+            FileInfo fileInfo = wrapFileInfo(fileStatus);
+            // FilePath is a unique identifier for a file, however it can be a long string
+            // hence using md5 hash of the file path as the identifier in the cache.
+            CacheContext context = CacheContext.defaults().setCacheIdentifier(
+                    md5().hashString(fileStatus.getPath().toString(), UTF_8).toString());
+            URIStatus status = new URIStatus(fileInfo, context);
+            LOG.info("Use local cache FileSystem to open file {} .", f);
+            if (this.useLegacyFileInputStream) {
+                return new FSDataInputStream(new AlluxioHdfsFileInputStream(
+                        new LocalCacheFileInStream(status, mAlluxioFileOpener, mCacheManager,
+                                mAlluxioConf), statistics));
+            }
+            return new FSDataInputStream(new CacheFileInputStream(f,
+                    new LocalCacheFileInStream(status, mAlluxioFileOpener, mCacheManager,
+                            mAlluxioConf),
+                    null, statistics, checkBufferSize(bufferSize)));
+        }
+        LOG.info("Use original FileSystem to open file {} .", f);
+        return super.open(f, bufferSize);
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+        this.statistics.incrementReadOps(1);
+        long start = System.currentTimeMillis();
+        FileStatus fileStatus = null;
+        Path p = this.fs.makeQualified(f);
+        try {
+            fileStatus = this.fileStatusCache.get(p);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof FileNotFoundException)
+                throw new FileNotFoundException("File does not exist: " + p);
+            LOG.error("Get file status from cache error: " + p, e);
+            return fileStatus;
+        }
+        LOG.info("Get file status {} from cache took: {}", f,
+                (System.currentTimeMillis() - start));
+        return fileStatus;
+    }
+
+    public CacheManager getmCacheManager() {
+        return mCacheManager;
+    }
+
+    public void setmCacheManager(CacheManager mCacheManager) {
+        this.mCacheManager = mCacheManager;
+    }
+
+    public AlluxioConfiguration getmAlluxioConf() {
+        return mAlluxioConf;
+    }
+
+    public void setmAlluxioConf(AlluxioConfiguration mAlluxioConf) {
+        this.mAlluxioConf = mAlluxioConf;
+    }
+
+    public boolean isUseLocalCache() {
+        return useLocalCache;
+    }
+
+    public void setUseLocalCache(boolean useLocalCache) {
+        this.useLocalCache = useLocalCache;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public void setBufferSize(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+    public boolean isUseLegacyFileInputStream() {
+        return useLegacyFileInputStream;
+    }
+
+    public void setUseLegacyFileInputStream(boolean useLegacyFileInputStream) {
+        this.useLegacyFileInputStream = useLegacyFileInputStream;
+    }
+
+    public LoadingCache<Path, FileStatus> getFileStatusCache() {
+        return fileStatusCache;
+    }
+}
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/AlluxioHdfsFileInputStream.java b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/AlluxioHdfsFileInputStream.java
new file mode 100644
index 0000000000..f94a5dffe1
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/AlluxioHdfsFileInputStream.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.fs;
+
+import alluxio.AlluxioURI;
+import alluxio.client.file.FileInStream;
+import alluxio.client.file.FileSystem;
+import alluxio.exception.AlluxioException;
+import alluxio.exception.ExceptionMessage;
+import alluxio.exception.FileDoesNotExistException;
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import javax.annotation.concurrent.NotThreadSafe;
+
+@NotThreadSafe
+public class AlluxioHdfsFileInputStream extends InputStream implements Seekable, PositionedReadable,
+        ByteBufferReadable {
+    private static final Logger LOG = LoggerFactory.getLogger(AlluxioHdfsFileInputStream.class);
+
+    private final Statistics mStatistics;
+    private final FileInStream mInputStream;
+
+    private boolean mClosed = false;
+
+    /**
+     * Constructs a new stream for reading a file from HDFS.
+     *
+     * @param fs the file system
+     * @param uri the Alluxio file URI
+     * @param stats filesystem statistics
+     */
+    public AlluxioHdfsFileInputStream(FileSystem fs, AlluxioURI uri, Statistics stats)
+            throws IOException {
+        LOG.debug("HdfsFileInputStream({}, {})", uri, stats);
+
+        mStatistics = stats;
+        try {
+            mInputStream = fs.openFile(uri);
+        } catch (FileDoesNotExistException e) {
+            // Transform the Alluxio exception to a Java exception to satisfy the HDFS API contract.
+            throw new FileNotFoundException(ExceptionMessage.PATH_DOES_NOT_EXIST.getMessage(uri));
+        } catch (AlluxioException e) {
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * Constructs a new stream for reading a file from HDFS.
+     *
+     * @param inputStream the input stream
+     * @param stats filesystem statistics
+     */
+    public AlluxioHdfsFileInputStream(FileInStream inputStream, Statistics stats) {
+        mInputStream = inputStream;
+        mStatistics = stats;
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (mClosed) {
+            throw new IOException("Cannot query available bytes from a closed stream.");
+        }
+        return (int) mInputStream.remaining();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (mClosed) {
+            return;
+        }
+        mInputStream.close();
+        mClosed = true;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return mInputStream.getPos();
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (mClosed) {
+            throw new IOException(ExceptionMessage.READ_CLOSED_STREAM.getMessage());
+        }
+
+        int data = mInputStream.read();
+        if (data != -1 && mStatistics != null) {
+            mStatistics.incrementBytesRead(1);
+            LOG.info("Read one byte.");
+        }
+        return data;
+    }
+
+    @Override
+    public int read(byte[] buffer) throws IOException {
+        return read(buffer, 0, buffer.length);
+    }
+
+    @Override
+    public int read(byte[] buffer, int offset, int length) throws IOException {
+        if (mClosed) {
+            throw new IOException(ExceptionMessage.READ_CLOSED_STREAM.getMessage());
+        }
+
+        int bytesRead = mInputStream.read(buffer, offset, length);
+        if (bytesRead != -1 && mStatistics != null) {
+            mStatistics.incrementBytesRead(bytesRead);
+            LOG.info("Read {} bytes.", bytesRead);
+        }
+        return bytesRead;
+    }
+
+    @Override
+    public int read(ByteBuffer buf) throws IOException {
+        if (mClosed) {
+            throw new IOException(ExceptionMessage.READ_CLOSED_STREAM.getMessage());
+        }
+        int bytesRead;
+        if (buf.hasArray() || !buf.isDirect()) {
+            bytesRead = mInputStream.read(buf.array(), buf.position(), buf.remaining());
+            if (bytesRead > 0) {
+                buf.position(buf.position() + bytesRead);
+            }
+        } else {
+            bytesRead = mInputStream.read(buf);
+        }
+        if (bytesRead != -1 && mStatistics != null) {
+            mStatistics.incrementBytesRead(bytesRead);
+            LOG.info("Read {} byte buffer {}.", bytesRead, buf.hasArray());
+        }
+        return bytesRead;
+    }
+
+    @Override
+    public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+        if (mClosed) {
+            throw new IOException(ExceptionMessage.READ_CLOSED_STREAM.getMessage());
+        }
+
+        int bytesRead = mInputStream.positionedRead(position, buffer, offset, length);
+        if (bytesRead != -1 && mStatistics != null) {
+            mStatistics.incrementBytesRead(bytesRead);
+            LOG.info("Read {} {} byte buffer.", position, bytesRead);
+        }
+        return bytesRead;
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer) throws IOException {
+        readFully(position, buffer, 0, buffer.length);
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+        int totalBytesRead = 0;
+        while (totalBytesRead < length) {
+            int bytesRead =
+                    read(position + totalBytesRead, buffer, offset + totalBytesRead, length - totalBytesRead);
+            if (bytesRead == -1) {
+                throw new EOFException();
+            }
+            totalBytesRead += bytesRead;
+        }
+        LOG.info("Read fully {} {} byte buffer.", position, totalBytesRead);
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+        try {
+            mInputStream.seek(pos);
+        } catch (IllegalArgumentException e) { // convert back to IOException
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * This method is not supported in {@link HdfsFileInputStream}.
+     *
+     * @param targetPos N/A
+     * @return N/A
+     * @throws IOException always
+     */
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+        throw new IOException(ExceptionMessage.NOT_SUPPORTED.getMessage());
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (mClosed) {
+            throw new IOException("Cannot skip bytes in a closed stream.");
+        }
+        return mInputStream.skip(n);
+    }
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/CacheFileInputStream.java b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/CacheFileInputStream.java
new file mode 100644
index 0000000000..185e7364a9
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/CacheFileInputStream.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.fs;
+
+import alluxio.client.file.FileInStream;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.DirectBufferPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class CacheFileInputStream extends FSInputStream implements ByteBufferReadable {
+
+    public static final int EINVAL = -0x16;
+    /**
+     * Refer to the code of 'FileInputStream' from JuiceFS Java SDK
+     */
+
+    private static final Logger LOG = LoggerFactory.getLogger(CacheFileInputStream.class);
+
+    private final DirectBufferPool bufferPool;
+    private final Statistics statistics;
+    private final FileInStream mInputStream;
+
+    private ByteBuffer buf;
+    private Path file;
+    private boolean mClosed = false;
+
+    public CacheFileInputStream(Path file, FileInStream inputStream,
+                                DirectBufferPool directBufferPool,
+                                Statistics statistics, int size) throws IOException {
+        this.file = file;
+        this.mInputStream = inputStream;
+        this.bufferPool = directBufferPool;
+        if (this.bufferPool != null) {
+            this.buf = this.bufferPool.getBuffer(size);
+        } else {
+            this.buf = ByteBuffer.allocate(size);
+        }
+        this.statistics = statistics;
+        this.buf.limit(0);
+    }
+
+    @Override
+    public synchronized long getPos() throws IOException {
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        return mInputStream.getPos() - buf.remaining();
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+        return false;
+    }
+
+    @Override
+    public synchronized int available() throws IOException {
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        return buf.remaining() + mInputStream.available();
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+    @Override
+    public void reset() throws IOException {
+        throw new IOException("Mark/reset not supported");
+    }
+
+    @Override
+    public synchronized int read() throws IOException {
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        if (!buf.hasRemaining() && !refill())
+            return -1; // EOF
+        assert buf.hasRemaining();
+        statistics.incrementBytesRead(1);
+        return buf.get() & 0xFF;
+    }
+
+    @Override
+    public synchronized int read(byte[] b, int off, int len) throws IOException {
+        if (off < 0 || len < 0 || b.length - off < len)
+            throw new IndexOutOfBoundsException();
+        if (len == 0)
+            return 0;
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        if (!buf.hasRemaining() && len <= buf.capacity() && !refill())
+            return -1; // No bytes were read before EOF.
+
+        int read = Math.min(buf.remaining(), len);
+        if (read > 0) {
+            buf.get(b, off, read);
+            statistics.incrementBytesRead(read);
+            off += read;
+            len -= read;
+        }
+        if (len == 0)
+            return read;
+        // buf is empty, read data from mInputStream directly
+        int more = readInternal(b, off, len);
+        if (more <= 0) {
+            if (read > 0) {
+                return read;
+            } else {
+                return -1;
+            }
+        }
+        // read byte[], buf must be empty
+        buf.position(0);
+        buf.limit(0);
+        return read + more;
+    }
+
+    protected synchronized int readInternal(byte[] b, int off, int len) throws IOException {
+        if (len == 0)
+            return 0;
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        if (b == null || off < 0 || len < 0 || b.length - off < len) {
+            throw new IllegalArgumentException(
+                    "Reading file " + this.file.toString() + " error, invalid arguments: " +
+                            off + " " + len);
+        }
+        int got = mInputStream.read(b, off, len);
+        if (got == 0)
+            return -1;
+        if (got == EINVAL)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        if (got < 0)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        statistics.incrementBytesRead(got);
+        return got;
+    }
+
+    private boolean refill() throws IOException {
+        buf.clear();
+        int read = readInternal(buf);
+        if (read <= 0) {
+            buf.limit(0);
+            return false; // EOF
+        }
+        statistics.incrementBytesRead(-read);
+        buf.flip();
+        return true;
+    }
+
+    @Override
+    public synchronized int read(long pos, byte[] b, int off, int len) throws IOException {
+        if (len == 0)
+            return 0;
+        if (pos < 0)
+            throw new EOFException(
+                    "Reading file " + this.file.toString() + " error, position is negative");
+        if (b == null || off < 0 || len < 0 || b.length - off < len) {
+            throw new IllegalArgumentException(
+                    "Reading file " + this.file.toString() + " error, invalid arguments: " +
+                            off + " " + len);
+        }
+        long oldPos = mInputStream.getPos();
+        mInputStream.seek(pos);
+        int got = -1;
+        try {
+            got = mInputStream.read(b, off, len);
+            if (got == 0)
+                return -1;
+            if (got == EINVAL)
+                throw new IOException(
+                        "Reading file " + this.file.toString() + " error, stream was closed");
+            if (got < 0)
+                throw new IOException(
+                        "Reading file " + this.file.toString() + " error, stream was closed");
+        } finally {
+            mInputStream.seek(oldPos);
+        }
+        statistics.incrementBytesRead(got);
+        return got;
+    }
+
+    @Override
+    public synchronized int read(ByteBuffer b) throws IOException {
+        if (!b.hasRemaining())
+            return 0;
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        if (!buf.hasRemaining() && b.remaining() <= buf.capacity() && !refill()) {
+            return -1;
+        }
+        int got = Math.min(b.remaining(), buf.remaining());
+        if (got > 0) {
+            byte[] readedBytes = new byte[got];
+            buf.get(readedBytes, 0, got);
+            b.put(readedBytes, 0, got);
+            statistics.incrementBytesRead(got);
+        }
+        if (!b.hasRemaining())
+            return got;
+        int more = readInternal(b);
+        if (more <= 0)
+            return got > 0 ? got : -1;
+        buf.position(0);
+        buf.limit(0);
+        return got + more;
+    }
+
+    protected synchronized int readInternal(ByteBuffer b) throws IOException {
+        if (!b.hasRemaining())
+            return 0;
+        int got;
+        if (b.hasArray()) {
+            // for heap bytebuffer
+            got = readInternal(b.array(), b.position(), b.remaining());
+            if (got <= 0)
+                return got;
+            b.position(b.position() + got);
+        } else {
+            assert b.isDirect();
+            got = mInputStream.read(b, b.position(), b.remaining());
+            if (got == EINVAL)
+                throw new IOException(
+                        "Reading file " + this.file.toString() + " error, stream was closed");
+            if (got < 0)
+                throw new IOException(
+                        "Reading file " + this.file.toString() + " error, stream was closed");
+            if (got == 0)
+                return -1;
+            statistics.incrementBytesRead(got);
+        }
+        return got;
+    }
+
+    @Override
+    public synchronized void seek(long p) throws IOException {
+        if (p < 0) {
+            throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+        }
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        if (p < mInputStream.getPos() && p >= mInputStream.getPos() - buf.limit()) {
+            buf.position((int) (p - (mInputStream.getPos() - buf.limit())));
+        } else {
+            buf.position(0);
+            buf.limit(0);
+            mInputStream.seek(p);
+        }
+    }
+
+    @Override
+    public synchronized long skip(long n) throws IOException {
+        if (n < 0)
+            return -1;
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        if (n < buf.remaining()) {
+            buf.position(buf.position() + (int) n);
+        } else {
+            mInputStream.skip(n - buf.remaining());
+            buf.position(0);
+            buf.limit(0);
+        }
+        return n;
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        if (!mClosed) {
+            mInputStream.close();
+            mClosed = true;
+        }
+        if (this.bufferPool != null && buf != null) {
+            this.bufferPool.returnBuffer(buf);
+        }
+        buf = null;
+    }
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/CacheFileSystemConstants.java b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/CacheFileSystemConstants.java
new file mode 100644
index 0000000000..45985a3f7c
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/CacheFileSystemConstants.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.fs;
+
+public class CacheFileSystemConstants {
+
+    private CacheFileSystemConstants() {
+    }
+
+    public static final String PARAMS_KEY_USE_CACHE =
+            "spark.kylin.local-cache.enabled";
+
+    public static final boolean PARAMS_KEY_USE_CACHE_DEFAULT_VALUE = false;
+
+    public static final String PARAMS_KEY_IO_FILE_BUFFER_SIZE = "io.file.buffer.size";
+
+    public static final int PARAMS_KEY_IO_FILE_BUFFER_SIZE_DEFAULT_VALUE = 65536;
+
+    public static final String PARAMS_KEY_FILE_STATUS_CACHE_TTL =
+            "spark.kylin.local-cache.filestatus.cache.ttl";
+
+    public static final long PARAMS_KEY_FILE_STATUS_CACHE_TTL_DEFAULT_VALUE = 3600L;
+
+    public static final String PARAMS_KEY_FILE_STATUS_CACHE_MAX_SIZE =
+            "spark.kylin.local-cache.filestatus.cache.max-size";
+
+    public static final long PARAMS_KEY_FILE_STATUS_CACHE_MAX_SIZE_DEFAULT_VALUE = 10000L;
+
+    public static final String PARAMS_KEY_USE_LEGACY_FILE_INPUTSTREAM =
+            "spark.kylin.local-cache.use.legacy.file.input-stream";
+
+    public static final boolean PARAMS_KEY_USE_LEGACY_FILE_INPUTSTREAM_DEFAULT_VALUE = false;
+
+    public static final String PARAMS_KEY_LOCAL_CACHE_FOR_CURRENT_FILES =
+            "spark.kylin.local-cache.for.current.files";
+
+    public static final String JUICEFS_SCHEME = "jfs";
+}
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/MemCacheFileInputStream.java b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/MemCacheFileInputStream.java
new file mode 100644
index 0000000000..aba546d04e
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/MemCacheFileInputStream.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.fs;
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class MemCacheFileInputStream extends FSInputStream implements ByteBufferReadable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MemCacheFileInputStream.class);
+
+    private final Statistics statistics;
+    private ByteBuffer buf;
+    private Path file;
+    private int fileLength;
+
+    public MemCacheFileInputStream(Path file, ByteBuffer buf, int fileLength,
+                                   Statistics statistics) throws IOException {
+        this.file = file;
+        this.buf = buf;
+        this.fileLength = fileLength;
+        this.statistics = statistics;
+        assert this.fileLength == buf.capacity();
+
+        // reset ByteBuffer
+        this.buf.flip();
+        this.buf.limit(fileLength);
+    }
+
+    @Override
+    public synchronized long getPos() throws IOException {
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        return buf.position();
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+        return false;
+    }
+
+    @Override
+    public synchronized int available() throws IOException {
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        return buf.remaining();
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+    @Override
+    public void reset() throws IOException {
+        throw new IOException("Mark/reset not supported");
+    }
+
+    public synchronized int read() throws IOException {
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        if (!buf.hasRemaining())
+            return -1; // EOF
+        statistics.incrementBytesRead(1);
+        return buf.get() & 0xFF;
+    }
+
+    @Override
+    public synchronized int read(byte[] b, int off, int len) throws IOException {
+        if (off < 0 || len < 0 || b.length - off < len)
+            throw new IndexOutOfBoundsException();
+        if (len == 0)
+            return 0;
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        if (!buf.hasRemaining())
+            return -1; // No bytes were read before EOF.
+
+        int read = Math.min(buf.remaining(), len);
+        if (read > 0) {
+            buf.get(b, off, read);
+            statistics.incrementBytesRead(read);
+        }
+        return read;
+    }
+
+    @Override
+    public synchronized int read(long pos, byte[] b, int off, int len) throws IOException {
+        if (len == 0)
+            return 0;
+        if (pos < 0 || pos > buf.limit())
+            throw new EOFException(
+                    "Reading file " + this.file.toString() + " error, position is negative");
+        if (b == null || off < 0 || len < 0 || b.length - off < len) {
+            throw new IllegalArgumentException(
+                    "Reading file " + this.file.toString() + " error, invalid arguments: " +
+                            off + " " + len);
+        }
+        int oldPos = buf.position();
+        buf.position((int)pos);
+        int got = Math.min(buf.remaining(), len);
+        try {
+            if (got > 0) {
+                buf.get(b, off, len);
+                statistics.incrementBytesRead(got);
+            }
+        } finally {
+            buf.position(oldPos);
+        }
+        return got;
+    }
+
+    @Override
+    public synchronized int read(ByteBuffer b) throws IOException {
+        if (!b.hasRemaining())
+            return 0;
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+        int got = Math.min(b.remaining(), buf.remaining());
+        if (got > 0) {
+            byte[] readedBytes = new byte[got];
+            buf.get(readedBytes, 0, got);
+            b.put(readedBytes, 0, got);
+            statistics.incrementBytesRead(got);
+        }
+        return got;
+    }
+
+    @Override
+    public synchronized void seek(long p) throws IOException {
+        if (p < 0) {
+            throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+        }
+        if (p > buf.limit()) {
+            throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+        }
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        buf.position((int)p);
+    }
+
+    @Override
+    public synchronized long skip(long n) throws IOException {
+        if (n < 0)
+            return -1;
+        if (buf == null)
+            throw new IOException(
+                    "Reading file " + this.file.toString() + " error, stream was closed");
+        if (n > buf.remaining()) {
+            throw new EOFException("Attempted to skip past the end of the file");
+        }
+        buf.position(buf.position() + (int) n);
+        return n;
+    }
+
+    @Override
+    public synchronized void close() throws IOException {
+        if (buf != null) {
+            buf = null;
+        }
+    }
+
+    public Statistics getStatistics() {
+        return statistics;
+    }
+
+    public ByteBuffer getBuf() {
+        return buf;
+    }
+
+    public void setBuf(ByteBuffer buf) {
+        this.buf = buf;
+    }
+
+    public Path getFile() {
+        return file;
+    }
+
+    public void setFile(Path file) {
+        this.file = file;
+    }
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/OnlyForTestCacheFileSystem.java b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/OnlyForTestCacheFileSystem.java
new file mode 100644
index 0000000000..8017266559
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/OnlyForTestCacheFileSystem.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.fs;
+
+public class OnlyForTestCacheFileSystem  extends AbstractCacheFileSystem {
+
+    /**
+     * Check whether needs to cache data on the current executor
+     */
+    @Override
+    public boolean isUseLocalCacheForTargetExecs() {
+        return true;
+    }
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/kylin/CacheAllFileSystem.java b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/kylin/CacheAllFileSystem.java
new file mode 100644
index 0000000000..a5af07ac32
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/kylin/CacheAllFileSystem.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.fs.kylin;
+
+import org.apache.kylin.cache.fs.AbstractCacheFileSystem;
+
+public class CacheAllFileSystem extends AbstractCacheFileSystem {
+
+    /**
+     * Check whether needs to cache data on the current executor
+     */
+    @Override
+    public boolean isUseLocalCacheForTargetExecs() {
+        return true;
+    }
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/kylin/KylinCacheFileSystem.java b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/kylin/KylinCacheFileSystem.java
new file mode 100644
index 0000000000..e47bff9b18
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/fs/kylin/KylinCacheFileSystem.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.fs.kylin;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.cache.fs.AbstractCacheFileSystem;
+import org.apache.kylin.cache.fs.CacheFileSystemConstants;
+import org.apache.spark.TaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KylinCacheFileSystem extends AbstractCacheFileSystem {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KylinCacheFileSystem.class);
+
+    /**
+     * Check whether needs to cache data on the current executor
+     */
+    @Override
+    public boolean isUseLocalCacheForTargetExecs() {
+        if (null == TaskContext.get()) {
+            LOG.warn("Task Context is null.");
+            return false;
+        }
+        String localCacheForCurrExecutor =
+                TaskContext.get()
+                        .getLocalProperty(CacheFileSystemConstants.PARAMS_KEY_LOCAL_CACHE_FOR_CURRENT_FILES);
+        LOG.info("Cache for current executor is {}", localCacheForCurrExecutor);
+        return (StringUtils.isNotBlank(localCacheForCurrExecutor) && Boolean.valueOf(localCacheForCurrExecutor));
+    }
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/utils/ConsistentHash.java b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/utils/ConsistentHash.java
new file mode 100644
index 0000000000..7f3af8bae7
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/utils/ConsistentHash.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.utils;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.util.List;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+public class ConsistentHash<T> {
+
+    private final int numberOfVirtualNodeReplicas;
+    private final SortedMap<Integer, T> circle = new ConcurrentSkipListMap<>();
+    private final HashFunction nodeHash = Hashing.murmur3_32();
+    private final HashFunction keyHash = Hashing.murmur3_32();
+
+    public ConsistentHash(int numberOfVirtualNodeReplicas, List<T> nodes) {
+        this.numberOfVirtualNodeReplicas = numberOfVirtualNodeReplicas;
+        addNode(nodes);
+    }
+
+    public ConsistentHash(int numberOfVirtualNodeReplicas) {
+        this.numberOfVirtualNodeReplicas = numberOfVirtualNodeReplicas;
+    }
+
+    public void addNode(List<T> nodes) {
+        for (T node : nodes) {
+            addNode(node);
+        }
+    }
+
+    public void addNode(T node) {
+        for (int i = 0; i < numberOfVirtualNodeReplicas; i++) {
+            circle.put(getKetamaHash(i + "" + node), node);
+        }
+    }
+
+    public void remove(List<T> nodes) {
+        for (T node : nodes) {
+            remove(node);
+        }
+    }
+
+    public void remove(T node) {
+        for (int i = 0; i < numberOfVirtualNodeReplicas; i++) {
+            circle.remove(getKetamaHash(i + "" + node));
+        }
+    }
+
+    public T get(Object key) {
+        if (circle.isEmpty()) {
+            return null;
+        }
+        int hash = getKeyHash(key.toString());
+        if (!circle.containsKey(hash)) {
+            SortedMap<Integer, T> tailMap = circle.tailMap(hash);
+            hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
+        }
+        return circle.get(hash);
+    }
+
+    private int getKeyHash(final String k) {
+        return keyHash.hashBytes(k.getBytes()).asInt();
+    }
+
+    private int getKetamaHash(final String k) {
+        return nodeHash.hashBytes(k.getBytes()).asInt();
+    }
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/utils/ReflectionUtil.java b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/utils/ReflectionUtil.java
new file mode 100644
index 0000000000..d2f5fba234
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/cache/utils/ReflectionUtil.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cache.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+public class ReflectionUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReflectionUtil.class);
+
+    private ReflectionUtil() {
+    }
+
+    /**
+     * Get the value of the specified field
+     *
+     */
+    public static Object getFieldValue(Object object, String fieldName) {
+        Field field = getDeclaredField(object, fieldName);
+        if (field == null) {
+            throw new IllegalArgumentException(
+                    "Could not find field [" + fieldName + "] on target [" + object + "]");
+        }
+        makeAccessible(field);
+
+        Object result = null;
+        try {
+            result = field.get(object);
+        } catch (IllegalAccessException e) {
+            LOG.error("Get field value error:", e);
+        }
+
+        return result;
+    }
+
+    /**
+     * Set the value of the specified field
+     *
+     */
+    public static void setFieldValue(Object object, String fieldName, Object value) {
+        Field field = getDeclaredField(object, fieldName);
+        if (field == null) {
+            throw new IllegalArgumentException(
+                    "Could not find field [" + fieldName + "] on target [" + object + "]");
+        }
+        makeAccessible(field);
+
+        try {
+            field.set(object, value);
+        } catch (IllegalAccessException e) {
+            LOG.error("Set field value error:", e);
+        }
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("unchecked")
+    public static Class getSuperClassGenricType(Class clazz, int index) {
+        Type genType = clazz.getGenericSuperclass();
+        if (!(genType instanceof ParameterizedType)) {
+            return Object.class;
+        }
+
+        Type[] params = ((ParameterizedType) genType).getActualTypeArguments();
+        if (index >= params.length || index < 0) {
+            return Object.class;
+        }
+        if (!(params[index] instanceof Class)) {
+            return Object.class;
+        }
+
+        return (Class) params[index];
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Class<T> getSuperGenericType(Class clazz) {
+        return getSuperClassGenricType(clazz, 0);
+    }
+
+    /**
+     *
+     */
+    public static Method getDeclaredMethod(Object object, String methodName, Class<?>[] parameterTypes) {
+        for (Class<?> superClass = object.getClass(); superClass != Object.class;
+             superClass = superClass.getSuperclass()) {
+            try {
+                return superClass.getDeclaredMethod(methodName, parameterTypes);
+            } catch (NoSuchMethodException e) {
+            }
+        }
+        LOG.error("Can not find method " + methodName);
+        return null;
+    }
+
+    /**
+     *
+     */
+    public static void makeAccessible(Field field) {
+        if (!Modifier.isPublic(field.getModifiers())) {
+            field.setAccessible(true);
+        }
+    }
+
+    /**
+     *
+     */
+    public static Field getDeclaredField(Object object, String filedName) {
+        for (Class<?> superClass = object.getClass(); superClass != Object.class;
+             superClass = superClass.getSuperclass()) {
+            try {
+                return superClass.getDeclaredField(filedName);
+            } catch (NoSuchFieldException e) {
+            }
+        }
+        LOG.error("Can not find field " + filedName);
+        return null;
+    }
+
+    /**
+     * Invoke method
+     *
+     */
+    public static Object invokeMethod(Object object, String methodName, Class<?>[] parameterTypes,
+                                      Object[] parameters) throws InvocationTargetException {
+        Method method = getDeclaredMethod(object, methodName, parameterTypes);
+        if (method == null) {
+            throw new IllegalArgumentException(
+                    "Could not find method [" + methodName + "] on target [" + object + "]");
+        }
+        method.setAccessible(true);
+
+        try {
+            return method.invoke(object, parameters);
+        } catch (IllegalAccessException e) {
+            LOG.error("Invoke method " + methodName + " error: ", e);
+        }
+
+        return null;
+    }
+}
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/softaffinity/SoftAffinityConstants.java b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/softaffinity/SoftAffinityConstants.java
new file mode 100644
index 0000000000..2f9e968d15
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/java/org/apache/kylin/softaffinity/SoftAffinityConstants.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.softaffinity;
+
+public class SoftAffinityConstants {
+
+    private SoftAffinityConstants() {
+    }
+
+    public static final String PARAMS_KEY_SOFT_AFFINITY_ENABLED =
+            "spark.kylin.soft-affinity.enabled";
+
+    public static final boolean PARAMS_KEY_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE = false;
+
+    public static final String PARAMS_KEY_SOFT_AFFINITY_REPLICATIONS_NUM =
+            "spark.kylin.soft-affinity.replications.num";
+
+    public static final int PARAMS_KEY_SOFT_AFFINITY_REPLICATIONS_NUM_DEFAULT_VALUE = 2;
+
+    // For HDFS Replications
+    public static final String PARAMS_KEY_SOFT_AFFINITY_MIN_TARGET_HOSTS =
+            "spark.kylin.soft-affinity.min.target-hosts";
+
+    public static final int PARAMS_KEY_SOFT_AFFINITY_MIN_TARGET_HOSTS_DEFAULT_VALUE = 1;
+}
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/scala/org/apache/kylin/softaffinity/SoftAffinityManager.scala b/kylin-spark-project/kylin-soft-affinity-cache/src/main/scala/org/apache/kylin/softaffinity/SoftAffinityManager.scala
new file mode 100644
index 0000000000..720f5576e4
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/scala/org/apache/kylin/softaffinity/SoftAffinityManager.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.softaffinity
+
+import org.apache.kylin.softaffinity.strategy.SoftAffinityStrategy
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import scala.collection.mutable
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+
+import java.util.concurrent.atomic.AtomicInteger
+
+object SoftAffinityManager extends Logging {
+
+  val resourceRWLock = new ReentrantReadWriteLock(true)
+
+  val softAffinityAllocation = new SoftAffinityStrategy
+
+  lazy val minOnTargetHosts = SparkEnv.get.conf.getInt(
+    SoftAffinityConstants.PARAMS_KEY_SOFT_AFFINITY_MIN_TARGET_HOSTS,
+    SoftAffinityConstants.PARAMS_KEY_SOFT_AFFINITY_MIN_TARGET_HOSTS_DEFAULT_VALUE
+  )
+
+  // (execId, host) list
+  val fixedIdForExecutors = new mutable.ListBuffer[Option[(String, String)]]()
+  // host list
+  val nodesExecutorsMap = new mutable.HashMap[String, mutable.HashSet[String]]()
+
+  protected val totalRegisteredExecutors = new AtomicInteger(0)
+
+  lazy val usingSoftAffinity = SparkEnv.get.conf.getBoolean(
+    SoftAffinityConstants.PARAMS_KEY_SOFT_AFFINITY_ENABLED,
+    SoftAffinityConstants.PARAMS_KEY_SOFT_AFFINITY_ENABLED_DEFAULT_VALUE
+  )
+
+  def totalExecutors(): Int = totalRegisteredExecutors.intValue()
+
+  def handleExecutorAdded(execHostId: (String, String)): Unit = {
+    resourceRWLock.writeLock().lock()
+    try {
+      // first, check whether the execId exists
+      if (!fixedIdForExecutors.exists( exec => {
+        exec.isDefined && exec.get._1.equals(execHostId._1)
+      })) {
+        val executorsSet = nodesExecutorsMap.getOrElseUpdate(execHostId._2,
+          new mutable.HashSet[String]())
+        executorsSet.add(execHostId._1)
+        if (fixedIdForExecutors.exists(_.isEmpty)) {
+          // replace the executor which was removed
+          val replaceIdx = fixedIdForExecutors.indexWhere(_.isEmpty)
+          fixedIdForExecutors(replaceIdx) = Option(execHostId)
+        } else {
+          fixedIdForExecutors += Option(execHostId)
+        }
+        totalRegisteredExecutors.addAndGet(1)
+      }
+      logInfo(s"After adding executor ${execHostId._1} on host ${execHostId._2}, " +
+        s"fixedIdForExecutors is ${fixedIdForExecutors.mkString(",")}, " +
+        s"nodesExecutorsMap is ${nodesExecutorsMap.keySet.mkString(",")}, " +
+        s"actual executors count is ${totalRegisteredExecutors.intValue()}."
+      )
+    } finally {
+      resourceRWLock.writeLock().unlock()
+    }
+  }
+
+  def handleExecutorRemoved(execId: String): Unit = {
+    resourceRWLock.writeLock().lock()
+    try {
+      val execIdx = fixedIdForExecutors.indexWhere( execHost => {
+        if (execHost.isDefined) {
+          execHost.get._1.equals(execId)
+        } else {
+          false
+        }
+      })
+      if (execIdx != -1) {
+        val findedExecId = fixedIdForExecutors(execIdx)
+        fixedIdForExecutors(execIdx) = None
+        val nodeExecs = nodesExecutorsMap.get(findedExecId.get._2).get
+        nodeExecs -= findedExecId.get._1
+        if (nodeExecs.isEmpty) {
+          // there is no executor on this host, remove
+          nodesExecutorsMap.remove(findedExecId.get._2)
+        }
+        totalRegisteredExecutors.addAndGet(-1)
+      }
+      logInfo(s"After removing executor ${execId}, " +
+        s"fixedIdForExecutors is ${fixedIdForExecutors.mkString(",")}, " +
+        s"nodesExecutorsMap is ${nodesExecutorsMap.keySet.mkString(",")}, " +
+        s"actual executors count is ${totalRegisteredExecutors.intValue()}."
+      )
+    } finally {
+      resourceRWLock.writeLock().unlock()
+    }
+  }
+
+  def checkTargetHosts(hosts: Array[String]): Boolean = {
+    resourceRWLock.readLock().lock()
+    try {
+      if (hosts.length < 1) {
+        // there is no host locality
+        false
+      } else if (nodesExecutorsMap.size < 1) {
+        true
+      } else {
+        // when the replication num of hdfs is less than 'minOnTargetHosts'
+        val minHostsNum = Math.min(minOnTargetHosts, hosts.length)
+        // there are how many the same hosts
+        nodesExecutorsMap.map(_._1).toArray.intersect(hosts).size >= minHostsNum
+      }
+    } finally {
+      resourceRWLock.readLock().unlock()
+    }
+  }
+
+  def askExecutors(file: String): Array[(String, String)] = {
+    resourceRWLock.readLock().lock()
+    try {
+      if (nodesExecutorsMap.size < 1) {
+        Array.empty
+      } else {
+        softAffinityAllocation.allocateExecs(file, fixedIdForExecutors)
+      }
+    } finally {
+      resourceRWLock.readLock().unlock()
+    }
+  }
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/scala/org/apache/kylin/softaffinity/scheduler/SoftAffinityListener.scala b/kylin-spark-project/kylin-soft-affinity-cache/src/main/scala/org/apache/kylin/softaffinity/scheduler/SoftAffinityListener.scala
new file mode 100644
index 0000000000..6d1b5ac1a1
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/scala/org/apache/kylin/softaffinity/scheduler/SoftAffinityListener.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.softaffinity.scheduler
+
+import org.apache.kylin.softaffinity.SoftAffinityManager
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded, SparkListenerExecutorRemoved}
+
+class SoftAffinityListener extends SparkListener with Logging {
+
+  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
+    val execId = executorAdded.executorId
+    val host = executorAdded.executorInfo.executorHost
+    SoftAffinityManager.handleExecutorAdded((execId, host))
+  }
+
+  override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
+    val execId = executorRemoved.executorId
+    SoftAffinityManager.handleExecutorRemoved(execId)
+  }
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/scala/org/apache/kylin/softaffinity/strategy/SoftAffinityAllocationTrait.scala b/kylin-spark-project/kylin-soft-affinity-cache/src/main/scala/org/apache/kylin/softaffinity/strategy/SoftAffinityAllocationTrait.scala
new file mode 100644
index 0000000000..06f9d284fb
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/scala/org/apache/kylin/softaffinity/strategy/SoftAffinityAllocationTrait.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.softaffinity.strategy
+
+import org.apache.kylin.softaffinity.SoftAffinityConstants
+
+import scala.collection.mutable.ListBuffer
+import org.apache.spark.SparkEnv
+
+trait SoftAffinityAllocationTrait {
+
+  lazy val softAffinityReplicationNum = SparkEnv.get.conf.getInt(
+    SoftAffinityConstants.PARAMS_KEY_SOFT_AFFINITY_REPLICATIONS_NUM,
+    SoftAffinityConstants.PARAMS_KEY_SOFT_AFFINITY_REPLICATIONS_NUM_DEFAULT_VALUE
+  )
+
+  /**
+   * allocate target executors for file
+   */
+  def allocateExecs(file: String,
+                    candidates: ListBuffer[Option[(String, String)]]): Array[(String, String)]
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/scala/org/apache/kylin/softaffinity/strategy/SoftAffinityStrategy.scala b/kylin-spark-project/kylin-soft-affinity-cache/src/main/scala/org/apache/kylin/softaffinity/strategy/SoftAffinityStrategy.scala
new file mode 100644
index 0000000000..a01641245d
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/scala/org/apache/kylin/softaffinity/strategy/SoftAffinityStrategy.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.softaffinity.strategy
+
+import scala.collection.mutable.LinkedHashSet
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.internal.Logging
+
+class SoftAffinityStrategy extends SoftAffinityAllocationTrait with Logging {
+
+  /**
+   * allocate target executors for file
+   */
+  override def allocateExecs(file: String,
+      candidates: ListBuffer[Option[(String, String)]]): Array[(String, String)] = {
+    if (candidates.size < 1) {
+      Array.empty
+    } else {
+      val candidatesSize = candidates.size
+      val halfCandidatesSize = candidatesSize / softAffinityReplicationNum
+      val resultSet = new LinkedHashSet[(String, String)]
+
+      val mod = file.hashCode % candidatesSize
+      val c1 = if (mod < 0) (mod + candidatesSize) else mod
+      // check whether the executor with index c1 is down
+      if (candidates(c1).isDefined) {
+        resultSet.add(candidates(c1).get)
+      }
+      for (i <- 1 to (softAffinityReplicationNum - 1)) {
+        val c2 = (c1 + halfCandidatesSize + i) % candidatesSize
+        if (candidates(c2).isDefined) {
+          resultSet.add(candidates(c2).get)
+        }
+      }
+      resultSet.toArray
+    }
+  }
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/spark31/org/apache/spark/sql/execution/datasources/CacheFilePartition.scala b/kylin-spark-project/kylin-soft-affinity-cache/src/main/spark31/org/apache/spark/sql/execution/datasources/CacheFilePartition.scala
new file mode 100644
index 0000000000..38a0dfa76c
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/spark31/org/apache/spark/sql/execution/datasources/CacheFilePartition.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.kylin.softaffinity.SoftAffinityManager
+
+import org.apache.spark.Partition
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.connector.read.InputPartition
+
+/**
+ * A collection of file blocks that should be read as a single task
+ * (possibly from multiple partitioned directories).
+ */
+case class CacheFilePartition(index: Int, files: Array[CachePartitionedFile])
+  extends Partition with InputPartition {
+  override def preferredLocations(): Array[String] = {
+    // Todo: Only get the first one
+    files.head.locations.map { p =>
+      if (p._1.equals("")) p._2
+      else ExecutorCacheTaskLocation(p._2, p._1).toString
+    }.toArray
+  }
+}
+
+object CacheFilePartition extends Logging {
+
+  def convertFilePartitionToCache(filePartition: FilePartition): CacheFilePartition = {
+    // Get the original preferred locations
+    val expectedTargets = filePartition.preferredLocations()
+    val files = filePartition.files
+
+    var locations = Array.empty[(String, String)]
+    if (!files.isEmpty && SoftAffinityManager.usingSoftAffinity
+      && !SoftAffinityManager.checkTargetHosts(expectedTargets)) {
+      // if there is no host in the node list which are executors running on,
+      // using SoftAffinityManager to generate target executors.
+      // Only using the first file to calculate the target executors
+      locations = SoftAffinityManager.askExecutors(files.head.filePath)
+      if (!locations.isEmpty) {
+        logInfo(s"SAMetrics=File ${files.head.filePath} - " +
+          s"the expected executors are ${locations.mkString("_")} ")
+      } else {
+        locations = expectedTargets.map(("", _))
+      }
+    } else {
+      locations = expectedTargets.map(("", _))
+    }
+    CacheFilePartition(filePartition.index, filePartition.files.map(p => {
+      CachePartitionedFile(p.partitionValues, p.filePath, p.start, p.length, locations)
+    }))
+  }
+}
+
+
diff --git a/kylin-spark-project/kylin-soft-affinity-cache/src/main/spark31/org/apache/spark/sql/execution/datasources/CacheFileScanRDD.scala b/kylin-spark-project/kylin-soft-affinity-cache/src/main/spark31/org/apache/spark/sql/execution/datasources/CacheFileScanRDD.scala
new file mode 100644
index 0000000000..75f7ed4abc
--- /dev/null
+++ b/kylin-spark-project/kylin-soft-affinity-cache/src/main/spark31/org/apache/spark/sql/execution/datasources/CacheFileScanRDD.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.kylin.cache.fs.CacheFileSystemConstants
+
+import org.apache.spark.{SparkEnv, TaskContext, Partition => RDDPartition}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+
+case class CachePartitionedFile(
+                            partitionValues: InternalRow,
+                            filePath: String,
+                            start: Long,
+                            length: Long,
+                            locations: Array[(String, String)] = Array.empty) {
+  override def toString: String = {
+    s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues," +
+      s" on executor locations ${locations.mkString}"
+  }
+}
+
+class CacheFileScanRDD(
+    @transient private val sparkSession: SparkSession,
+    readFunction: (PartitionedFile) => Iterator[InternalRow],
+    @transient val cacheFilePartitions: Seq[CacheFilePartition])
+  extends FileScanRDD(sparkSession, readFunction, Nil) {
+
+  def checkCached(cacheLocations: Array[(String, String)]): Boolean = {
+    cacheLocations.map(_._1).contains(SparkEnv.get.executorId)
+  }
+
+  override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = {
+    // Convert 'CacheFilePartition' to 'FilePartition'
+    val start = System.currentTimeMillis()
+    val cacheFilePartition = split.asInstanceOf[CacheFilePartition]
+    val cacheSplit = FilePartition(cacheFilePartition.index, cacheFilePartition.files.map { f =>
+      PartitionedFile(f.partitionValues, f.filePath, f.start, f.length, f.locations.map(_._2))
+    })
+    var currFilePath = "empty"
+    val isCache = if (!cacheFilePartition.files.isEmpty) {
+      currFilePath = cacheFilePartition.files.head.filePath
+      checkCached(cacheFilePartition.files.head.locations)
+    } else {
+      false
+    }
+    logInfo(s"SAMetrics=File ${currFilePath} running in task ${context.taskAttemptId()} " +
+      s"on executor ${SparkEnv.get.executorId} with cached ${isCache} , " +
+      s"took ${(System.currentTimeMillis() - start)}")
+    // Set whether needs to cache data on this executor
+    context.getLocalProperties.setProperty(
+      CacheFileSystemConstants.PARAMS_KEY_LOCAL_CACHE_FOR_CURRENT_FILES, isCache.toString)
+    super.compute(cacheSplit, context)
+  }
+
+  override protected def getPartitions: Array[RDDPartition] = cacheFilePartitions.toArray
+
+  override protected def getPreferredLocations(split: RDDPartition): Seq[String] = {
+    split.asInstanceOf[CacheFilePartition].preferredLocations()
+  }
+}
diff --git a/kylin-spark-project/kylin-spark-common/pom.xml b/kylin-spark-project/kylin-spark-common/pom.xml
index 4d0cee3851..0e62aa27e6 100644
--- a/kylin-spark-project/kylin-spark-common/pom.xml
+++ b/kylin-spark-project/kylin-spark-common/pom.xml
@@ -48,6 +48,11 @@
             <artifactId>kylin-spark-metadata</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-soft-affinity-cache</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-yarn-api</artifactId>
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
index 17422a841d..b0b2301596 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.spi.LoggingEvent;
+import org.apache.spark.utils.SparkHadoopUtils;
 
 import java.io.BufferedWriter;
 import java.io.IOException;
@@ -91,7 +92,7 @@ public abstract class AbstractHdfsLogAppender extends AppenderSkeleton {
 
     public FileSystem getFileSystem() {
         if (null == fileSystem) {
-            return getFileSystem(new Configuration());
+            return getFileSystem(SparkHadoopUtils.newConfigurationWithSparkConf());
         }
         return fileSystem;
     }
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java
index 0e5ad3c318..94a44092d0 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.spi.LoggingEvent;
+import org.apache.spark.utils.SparkHadoopUtils;
 
 import java.io.IOException;
 import java.util.List;
@@ -92,7 +93,7 @@ public class SparkDriverHdfsLogAppender extends AbstractHdfsLogAppender {
     public void doWriteLog(int eventSize, List<LoggingEvent> transaction)
             throws IOException, InterruptedException {
         if (!isWriterInited()) {
-            Configuration conf = new Configuration();
+            Configuration conf = SparkHadoopUtils.newConfigurationWithSparkConf();
             if (isKerberosEnable()) {
                 UserGroupInformation.setConfiguration(conf);
                 UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
index f341340e61..d28ae16b21 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
@@ -20,7 +20,6 @@ package org.apache.kylin.engine.spark.common.logging;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,6 +29,7 @@ import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.spi.LoggingEvent;
 import org.apache.spark.SparkEnv;
 import org.apache.spark.deploy.SparkHadoopUtil;
+import org.apache.spark.utils.SparkHadoopUtils;
 import scala.runtime.BoxedUnit;
 
 import java.io.File;
@@ -160,7 +160,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
                 SparkHadoopUtil.get().runAsSparkUser(new scala.runtime.AbstractFunction0<scala.runtime.BoxedUnit>() {
                     @Override
                     public BoxedUnit apply() {
-                        if (!initHdfsWriter(file, new Configuration())) {
+                        if (!initHdfsWriter(file, SparkHadoopUtils.newConfigurationWithSparkConf())) {
                             LogLog.error("Failed to init the hdfs writer!");
                         }
                         try {
diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/SparkHadoopUtils.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/SparkHadoopUtils.scala
new file mode 100644
index 0000000000..0594b1c05a
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/SparkHadoopUtils.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkEnv
+import org.apache.spark.deploy.SparkHadoopUtil
+
+object SparkHadoopUtils {
+
+  def newConfigurationWithSparkConf(): Configuration = {
+    SparkHadoopUtil.newConfiguration(SparkEnv.get.conf)
+  }
+}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
index b924f3afd8..61fef995fe 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
+++ b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
@@ -21,6 +21,7 @@ package org.apache.spark.sql.execution
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
 import org.apache.kylin.common.KylinConfig
 import org.apache.spark.rdd.RDD
+import org.apache.kylin.softaffinity.SoftAffinityManager
 import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, ExpressionUtils, SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
@@ -165,7 +166,14 @@ class KylinFileSourceScanExec(
       FilePartition(shardId, filesToPartitionId.getOrElse(shardId, Nil).toArray)
     }
 
-    new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+    if (SoftAffinityManager.usingSoftAffinity) {
+      val start = System.currentTimeMillis()
+      val cachePartitions = filePartitions.map(CacheFilePartition.convertFilePartitionToCache(_))
+      logInfo(s"Convert bucketed file partition took: ${(System.currentTimeMillis() - start)}")
+      new CacheFileScanRDD(fsRelation.sparkSession, readFile, cachePartitions)
+    } else {
+      new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+    }
   }
 
   /**
@@ -237,7 +245,14 @@ class KylinFileSourceScanExec(
     }
     closePartition()
 
-    new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
+    if (SoftAffinityManager.usingSoftAffinity) {
+      val start = System.currentTimeMillis()
+      val cachePartitions = partitions.map(CacheFilePartition.convertFilePartitionToCache(_))
+      logInfo(s"Convert file partition took: ${(System.currentTimeMillis() - start)}")
+      new CacheFileScanRDD(fsRelation.sparkSession, readFile, cachePartitions)
+    } else {
+      new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
+    }
   }
 
   private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
diff --git a/kylin-spark-project/pom.xml b/kylin-spark-project/pom.xml
index 2031d2c5ee..5d2c241872 100644
--- a/kylin-spark-project/pom.xml
+++ b/kylin-spark-project/pom.xml
@@ -37,6 +37,7 @@
         <module>kylin-spark-query</module>
         <module>kylin-spark-test</module>
         <module>kylin-spark-classloader</module>
+        <module>kylin-soft-affinity-cache</module>
     </modules>
 
     <dependencies>