You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/19 12:59:56 UTC

[incubator-uniffle] branch master updated: Support storing shuffle data to secured dfs cluster (#53)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new abb92159 Support storing shuffle data to secured dfs cluster (#53)
abb92159 is described below

commit abb92159ee750ee3a6a96ccf5a670c6fee682163
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Fri Aug 19 20:59:51 2022 +0800

    Support storing shuffle data to secured dfs cluster (#53)
    
    ### What changes were proposed in this pull request?
    Support storing shuffle data to secured HDFS cluster by spark job user's own permission in shuffle server side.
    
    ### Why are the changes needed?
    
    When using the storage type of MEMEORY_LOCALFILE_HDFS, we meet some problems on flushing shuffle data to secured HDFS clusters due to lacking credentials. To solve this, keytabs need to be distributed to all shuffle servers and login/refresh by crontab or other ways.
    
    But this way is not secured, it’s better to flush data with corresponding HDFS users for data isolation.
    
    We hope that:
    1. user A launched a Spark application, and send shuffle data to shuffle server.
    2. Shuffle server should write shuffle data with "user A" into HDFS.
    3. Reduce tasks (launched by user A) could read shuffle data in HDFS with user
    4. Otherwise, user A may not have the permission to read shuffle data written by shuffle server (another user) if it it is not owned by A.
    
    More detail and motivation can be found  in design doc: https://docs.google.com/document/d/1pIDwCwv8iwnXmFQeTZKA5tK55SRc_f0SPKYFONA5K70
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    
    ### How was this patch tested?
    Manual tests and unit tests.
---
 .gitignore                                         |   4 +
 .../task/reduce/RssRemoteMergeManagerImpl.java     |   9 +-
 .../apache/spark/shuffle/RssShuffleManager.java    |   7 +-
 .../client/impl/ShuffleWriteClientImpl.java        |  11 +-
 common/pom.xml                                     |  30 ++
 .../apache/uniffle/common/config/RssBaseConf.java  |  28 +-
 .../filesystem/HadoopFilesystemProvider.java       |  72 +++++
 .../common/security/HadoopSecurityContext.java     | 122 ++++++++
 .../common/security/NoOpSecurityContext.java       |  40 +++
 .../uniffle/common/security/SecurityConfig.java    |  81 +++++
 .../uniffle/common/security/SecurityContext.java   |  29 ++
 .../common/security/SecurityContextFactory.java    |  57 ++++
 .../org/apache/uniffle/common/KerberizedHdfs.java  | 292 ++++++++++++++++++
 .../apache/uniffle/common/KerberizedHdfsBase.java  |  63 ++++
 .../filesystem/HadoopFilesystemProviderTest.java   | 124 ++++++++
 .../common/security/HadoopSecurityContextTest.java | 123 ++++++++
 .../security/SecurityContextFactoryTest.java       |  82 +++++
 .../coordinator/AccessCandidatesChecker.java       |   3 +-
 .../apache/uniffle/coordinator/AccessManager.java  |   3 +-
 .../uniffle/coordinator/ClientConfManager.java     |   3 +-
 .../uniffle/coordinator/CoordinatorServer.java     |  19 ++
 .../uniffle/coordinator/CoordinatorUtils.java      |  22 --
 .../uniffle/coordinator/SimpleClusterManager.java  |   5 +-
 .../coordinator/SimpleClusterManagerTest.java      |   6 +-
 integration-test/common/pom.xml                    |  16 +
 .../test/AccessCandidatesCheckerHdfsTest.java      |  13 +-
 .../AccessCandidatesCheckerKerberizedHdfsTest.java |  58 ++++
 .../uniffle/test/ClientConfManagerHdfsTest.java    |  25 +-
 .../test/ClientConfManagerKerberlizedHdfsTest.java |  43 +++
 .../apache/uniffle/test/ShuffleReadWriteBase.java  |  20 +-
 .../apache/uniffle/test/ShuffleServerGrpcTest.java |   1 +
 .../test/ShuffleServerWithKerberizedHdfsTest.java  | 341 +++++++++++++++++++++
 .../client/impl/grpc/ShuffleServerGrpcClient.java  |   8 +-
 .../client/request/RssRegisterShuffleRequest.java  |  13 +-
 pom.xml                                            |   6 +
 proto/src/main/proto/Rss.proto                     |   1 +
 .../apache/uniffle/server/ShuffleFlushManager.java |  10 +-
 .../org/apache/uniffle/server/ShuffleServer.java   |  19 +-
 .../uniffle/server/ShuffleServerGrpcService.java   |  11 +-
 .../apache/uniffle/server/ShuffleTaskManager.java  |  11 +-
 .../server/buffer/ShuffleBufferManager.java        |   2 +-
 .../uniffle/server/ShuffleFlushManagerTest.java    |  29 +-
 .../uniffle/server/ShuffleServerMetricsTest.java   |   7 +-
 .../uniffle/server/ShuffleTaskManagerTest.java     |  60 +++-
 .../server/buffer/ShuffleBufferManagerTest.java    |   4 +
 .../server/storage/HdfsStorageManagerTest.java     |   9 +-
 .../server/storage/MultiStorageManagerTest.java    |   1 +
 storage/pom.xml                                    |  18 ++
 .../apache/uniffle/storage/common/HdfsStorage.java |   4 +-
 .../handler/impl/HdfsClientReadHandler.java        |   6 +-
 .../storage/handler/impl/HdfsFileReader.java       |   8 +-
 .../storage/handler/impl/HdfsFileWriter.java       |  16 +-
 .../handler/impl/HdfsShuffleDeleteHandler.java     |   3 +-
 .../handler/impl/HdfsShuffleReadHandler.java       |   4 +-
 .../handler/impl/HdfsShuffleWriteHandler.java      |  31 +-
 .../request/CreateShuffleWriteHandlerRequest.java  |  13 +-
 .../uniffle/storage/util/ShuffleStorageUtils.java  |  18 --
 .../storage/HdfsShuffleHandlerTestBase.java        |  16 +-
 .../org/apache/uniffle/storage/HdfsTestBase.java   |   5 +-
 .../handler/impl/HdfsClientReadHandlerTest.java    |  24 +-
 .../storage/handler/impl/HdfsFileReaderTest.java   |   6 +-
 .../storage/handler/impl/HdfsFileWriterTest.java   |  18 +-
 .../storage/handler/impl/HdfsHandlerTest.java      |   4 +-
 .../handler/impl/HdfsShuffleReadHandlerTest.java   |  24 +-
 .../impl/KerberizedHdfsClientReadHandlerTest.java  |  49 +++
 .../impl/KerberizedHdfsShuffleReadHandlerTest.java |  49 +++
 .../storage/util/ShuffleHdfsStorageUtilsTest.java  |  16 +-
 .../ShuffleKerberizedHdfsStorageUtilsTest.java     |  46 +++
 68 files changed, 2146 insertions(+), 175 deletions(-)

diff --git a/.gitignore b/.gitignore
index ce609e1c..f1d2a175 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,3 +23,7 @@ dependency-reduced-pom.xml
 rss-*.tgz
 hadoop-*.tar.gz
 deploy/kubernetes/docker/hadoopconfig/*
+common/build/
+integration-test/common/build/
+storage/build/
+build/
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java
index 4fe38b2b..54e1e104 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssRemoteMergeManagerImpl.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.Progress;
 
 import org.apache.uniffle.common.exception.RssException;
-import org.apache.uniffle.storage.util.ShuffleStorageUtils;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 
 public class RssRemoteMergeManagerImpl<K, V> extends MergeManagerImpl<K, V> {
 
@@ -144,8 +144,11 @@ public class RssRemoteMergeManagerImpl<K, V> extends MergeManagerImpl<K, V> {
     try {
       remoteConf.setInt("dfs.replication", replication);
       remoteConf.setInt("dfs.client.block.write.retries", retries); // origin=3
-      this.remoteFS = ShuffleStorageUtils.getFileSystemForPath(new Path(basePath), remoteConf);
-    } catch (IOException e) {
+      this.remoteFS = HadoopFilesystemProvider.getFilesystem(
+          new Path(basePath),
+          remoteConf
+      );
+    } catch (Exception e) {
       throw new RuntimeException("Cannot init remoteFS on path:" + basePath);
     }
 
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 6f2df5cb..96ed7370 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -564,8 +564,11 @@ public class RssShuffleManager implements ShuffleManager {
   }
 
   @VisibleForTesting
-  protected void registerShuffleServers(String appId, int shuffleId,
-                                        Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges) {
+  protected void registerShuffleServers(
+      String appId,
+      int shuffleId,
+      Map<ShuffleServerInfo,
+      List<PartitionRange>> serverToPartitionRanges) {
     if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) {
       return;
     }
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index c6c13e0e..bd194ddf 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -33,6 +33,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -324,8 +325,16 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
       int shuffleId,
       List<PartitionRange> partitionRanges,
       RemoteStorageInfo remoteStorage) {
+    String user = null;
+    try {
+      user = UserGroupInformation.getCurrentUser().getShortUserName();
+    } catch (Exception e) {
+      LOG.error("Error on getting user from ugi.", e);
+    }
+    LOG.info("User: {}", user);
+
     RssRegisterShuffleRequest request =
-        new RssRegisterShuffleRequest(appId, shuffleId, partitionRanges, remoteStorage);
+        new RssRegisterShuffleRequest(appId, shuffleId, partitionRanges, remoteStorage, user);
     RssRegisterShuffleResponse response = getShuffleServerClient(shuffleServerInfo).registerShuffle(request);
 
     String msg = "Error happened when registerShuffle with appId[" + appId + "], shuffleId[" + shuffleId
diff --git a/common/pom.xml b/common/pom.xml
index 9d3f385d..f82bb16f 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -76,5 +76,35 @@
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+    </dependency>
   </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <version>2.4.0</version>
+        <extensions>true</extensions>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 496e6efe..7901c5d3 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -20,7 +20,6 @@ package org.apache.uniffle.common.config;
 import java.util.List;
 import java.util.Map;
 
-
 public class RssBaseConf extends RssConf {
 
   public static final ConfigOption<String> RSS_COORDINATOR_QUORUM = ConfigOptions
@@ -158,6 +157,33 @@ public class RssBaseConf extends RssConf {
       .defaultValue(true)
       .withDescription("The switch for jvm metrics verbose");
 
+  public static final ConfigOption<Boolean> RSS_SECURITY_HADOOP_KERBEROS_ENABLE = ConfigOptions
+      .key("rss.security.hadoop.kerberos.enable")
+      .booleanType()
+      .defaultValue(false)
+      .withDescription("Whether enable visiting secured hadoop cluster.");
+
+  public static final ConfigOption<String> RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE = ConfigOptions
+      .key("rss.security.hadoop.kerberos.keytab.file")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("The kerberos keytab file path. And only when "
+          + RSS_SECURITY_HADOOP_KERBEROS_ENABLE.key() + " enabled, the option will be valid.");
+
+  public static final ConfigOption<String> RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL = ConfigOptions
+      .key("rss.security.hadoop.kerberos.principal")
+      .stringType()
+      .noDefaultValue()
+      .withDescription("The kerberos keytab principal. And only when "
+          + RSS_SECURITY_HADOOP_KERBEROS_ENABLE.key() + " enabled, the option will be valid.");
+
+  public static final ConfigOption<Long> RSS_SECURITY_HADOOP_KERBEROS_RELOGIN_INTERVAL_SEC = ConfigOptions
+      .key("rss.security.hadoop.kerberos.relogin.interval.sec")
+      .longType()
+      .checkValue(ConfigUtils.POSITIVE_INTEGER_VALIDATOR, "The value must be positive integer")
+      .defaultValue(60L)
+      .withDescription("The kerberos authentication relogin interval. unit: sec");
+
   public boolean loadCommonConf(Map<String, String> properties) {
     if (properties == null) {
       return false;
diff --git a/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java b/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java
new file mode 100644
index 00000000..ada4d06d
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProvider.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.filesystem;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.security.SecurityContextFactory;
+
+/**
+ * This HadoopFilesystemProvider will provide the only entrypoint to get the hadoop filesystem whether
+ * the dfs cluster is kerberized or not.
+ */
+public class HadoopFilesystemProvider {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFilesystemProvider.class);
+
+  public static FileSystem getFilesystem(Path path, Configuration configuration) throws Exception {
+    return getFilesystem(
+        SecurityContextFactory.get().getSecurityContext().getContextLoginUser(),
+        path,
+        configuration
+    );
+  }
+
+  public static FileSystem getFilesystem(String user, Path path, Configuration configuration) throws Exception {
+    UserGroupInformation.AuthenticationMethod authenticationMethod =
+        SecurityUtil.getAuthenticationMethod(configuration);
+    boolean needSecurity = authenticationMethod != UserGroupInformation.AuthenticationMethod.SIMPLE;
+
+    Callable<FileSystem> callable = () -> FileSystem.get(path.toUri(), configuration);
+
+    FileSystem fileSystem;
+    if (needSecurity) {
+      fileSystem = SecurityContextFactory
+          .get()
+          .getSecurityContext()
+          .runSecured(user, callable);
+    } else {
+      fileSystem = callable.call();
+    }
+
+    if (fileSystem instanceof LocalFileSystem) {
+      LOGGER.debug("{} is local file system", path);
+      return ((LocalFileSystem) fileSystem).getRawFileSystem();
+    }
+
+    return fileSystem;
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java b/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java
new file mode 100644
index 00000000..41f5d413
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/security/HadoopSecurityContext.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.security;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.ThreadUtils;
+
+public class HadoopSecurityContext implements SecurityContext {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HadoopSecurityContext.class);
+  private static final String KRB5_CONF_KEY = "java.security.krb5.conf";
+
+  private UserGroupInformation loginUgi;
+  private ScheduledExecutorService refreshScheduledExecutor;
+
+  public HadoopSecurityContext(
+      String krb5ConfPath,
+      String keytabFile,
+      String principal,
+      long refreshIntervalSec) throws Exception {
+    if (StringUtils.isEmpty(keytabFile)) {
+      throw new IllegalArgumentException("KeytabFilePath must be not null or empty");
+    }
+    if (StringUtils.isEmpty(principal)) {
+      throw new IllegalArgumentException("principal must be not null or empty");
+    }
+    if (refreshIntervalSec <= 0) {
+      throw new IllegalArgumentException("refreshIntervalSec must be not negative");
+    }
+
+    if (StringUtils.isNotEmpty(krb5ConfPath)) {
+      System.setProperty(KRB5_CONF_KEY, krb5ConfPath);
+    }
+
+    Configuration conf = new Configuration(false);
+    conf.set("hadoop.security.authentication", "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+
+    this.loginUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabFile);
+
+    LOGGER.info("Got Kerberos ticket, keytab [{}], principal [{}], user [{}]",
+        keytabFile, principal, loginUgi.getShortUserName());
+
+    refreshScheduledExecutor = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("Kerberos-refresh-%d")
+    );
+    refreshScheduledExecutor.scheduleAtFixedRate(
+        this::authRefresh,
+        refreshIntervalSec,
+        refreshIntervalSec,
+        TimeUnit.SECONDS);
+  }
+
+  private void authRefresh() {
+    try {
+      LOGGER.info("Renewing kerberos token.");
+      loginUgi.checkTGTAndReloginFromKeytab();
+    } catch (Throwable t) {
+      LOGGER.error("Error in token renewal task: ", t);
+    }
+  }
+
+  @Override
+  public <T> T runSecured(String user, Callable<T> securedCallable) throws Exception {
+    if (StringUtils.isEmpty(user)) {
+      throw new Exception("User must be not null or empty");
+    }
+
+    // Run with the proxy user.
+    if (!user.equals(loginUgi.getShortUserName())) {
+      return executeWithUgiWrapper(
+          UserGroupInformation.createProxyUser(user, loginUgi),
+          securedCallable
+      );
+    }
+
+    // Run with the current login user.
+    return executeWithUgiWrapper(loginUgi, securedCallable);
+  }
+
+  @Override
+  public String getContextLoginUser() {
+    return loginUgi.getShortUserName();
+  }
+
+  private <T> T executeWithUgiWrapper(UserGroupInformation ugi, Callable<T> callable) throws Exception {
+    return ugi.doAs((PrivilegedExceptionAction<T>) () -> callable.call());
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (refreshScheduledExecutor != null) {
+      refreshScheduledExecutor.shutdown();
+    }
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/security/NoOpSecurityContext.java b/common/src/main/java/org/apache/uniffle/common/security/NoOpSecurityContext.java
new file mode 100644
index 00000000..52dc939a
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/security/NoOpSecurityContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.security;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+/** A security context that simply runs a Callable without performing a login action. */
+public class NoOpSecurityContext implements SecurityContext {
+
+  @Override
+  public <T> T runSecured(String user, Callable<T> securedCallable) throws Exception {
+    return securedCallable.call();
+  }
+
+  @Override
+  public String getContextLoginUser() {
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // ignore
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/security/SecurityConfig.java b/common/src/main/java/org/apache/uniffle/common/security/SecurityConfig.java
new file mode 100644
index 00000000..5b1048ca
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/security/SecurityConfig.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.security;
+
+public class SecurityConfig {
+  private String krb5ConfPath;
+  private String keytabFilePath;
+  private String principal;
+  private long reloginIntervalSec;
+
+  private SecurityConfig() {
+    // ignore.
+  }
+
+  public String getKrb5ConfPath() {
+    return krb5ConfPath;
+  }
+
+  public String getKeytabFilePath() {
+    return keytabFilePath;
+  }
+
+  public String getPrincipal() {
+    return principal;
+  }
+
+  public long getReloginIntervalSec() {
+    return reloginIntervalSec;
+  }
+
+  public static class Builder {
+    private SecurityConfig info;
+
+    public Builder() {
+      this.info = new SecurityConfig();
+    }
+
+    public SecurityConfig.Builder keytabFilePath(String keytabFilePath) {
+      info.keytabFilePath = keytabFilePath;
+      return this;
+    }
+
+    public SecurityConfig.Builder principal(String principal) {
+      info.principal = principal;
+      return this;
+    }
+
+    public SecurityConfig.Builder reloginIntervalSec(long reloginIntervalSec) {
+      info.reloginIntervalSec = reloginIntervalSec;
+      return this;
+    }
+
+    public SecurityConfig.Builder krb5ConfPath(String krb5ConfPath) {
+      info.krb5ConfPath = krb5ConfPath;
+      return this;
+    }
+
+    public SecurityConfig build() {
+      return info;
+    }
+  }
+
+  public static SecurityConfig.Builder newBuilder() {
+    return new SecurityConfig.Builder();
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/security/SecurityContext.java b/common/src/main/java/org/apache/uniffle/common/security/SecurityContext.java
new file mode 100644
index 00000000..cb95ecce
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/security/SecurityContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.security;
+
+import java.io.Closeable;
+import java.util.concurrent.Callable;
+
+/** A security context with may be required to run a Callable. */
+public interface SecurityContext extends Closeable {
+
+  <T> T runSecured(String user, Callable<T> securedCallable) throws Exception;
+
+  String getContextLoginUser();
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/security/SecurityContextFactory.java b/common/src/main/java/org/apache/uniffle/common/security/SecurityContextFactory.java
new file mode 100644
index 00000000..47c743c9
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/security/SecurityContextFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.security;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SecurityContextFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SecurityContextFactory.class);
+
+  private SecurityContext securityContext = new NoOpSecurityContext();
+
+  static class LazyHolder {
+    static final SecurityContextFactory SECURITY_CONTEXT_FACTORY = new SecurityContextFactory();
+  }
+
+  public static SecurityContextFactory get() {
+    return LazyHolder.SECURITY_CONTEXT_FACTORY;
+  }
+
+  public void init(SecurityConfig securityConfig) throws Exception {
+    if (securityConfig == null) {
+      this.securityContext = new NoOpSecurityContext();
+      return;
+    }
+
+    this.securityContext = new HadoopSecurityContext(
+        securityConfig.getKrb5ConfPath(),
+        securityConfig.getKeytabFilePath(),
+        securityConfig.getPrincipal(),
+        securityConfig.getReloginIntervalSec()
+    );
+    LOGGER.info("Initialized security context: {}", securityContext.getClass().getSimpleName());
+  }
+
+  public SecurityContext getSecurityContext() {
+    if (securityContext == null) {
+      throw new RuntimeException("No initialized security context.");
+    }
+    return securityContext;
+  }
+}
diff --git a/common/src/test/java/org/apache/uniffle/common/KerberizedHdfs.java b/common/src/test/java/org/apache/uniffle/common/KerberizedHdfs.java
new file mode 100644
index 00000000..8482490c
--- /dev/null
+++ b/common/src/test/java/org/apache/uniffle/common/KerberizedHdfs.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.PrivilegedExceptionAction;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ImpersonationProvider;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KerberizedHdfs implements Serializable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(KerberizedHdfs.class);
+
+  private MiniKdc kdc;
+  private File workDir;
+  private Path tempDir;
+  private Path kerberizedDfsBaseDir;
+
+  private MiniDFSCluster kerberizedDfsCluster;
+
+  private Class testRunnerCls = KerberizedHdfs.class;
+
+  // The super user for accessing HDFS
+  private String hdfsKeytab;
+  private String hdfsPrincipal;
+  // The normal user of alex for accessing HDFS
+  private String alexKeytab;
+  private String alexPrincipal;
+  // krb5.conf file path
+  private String krb5ConfFile;
+
+  protected void setup() throws Exception {
+    tempDir = Files.createTempDirectory("tempDir").toFile().toPath();
+    kerberizedDfsBaseDir = Files.createTempDirectory("kerberizedDfsBaseDir").toFile().toPath();
+
+    startKDC();
+    startKerberizedDFS();
+    setupDFSData();
+  }
+
+  private void setupDFSData() throws Exception {
+    String principal = "alex/localhost";
+    File keytab = new File(workDir, "alex.keytab");
+    kdc.createPrincipal(keytab, principal);
+    alexKeytab = keytab.getAbsolutePath();
+    alexPrincipal = principal;
+
+    FileSystem writeFs = kerberizedDfsCluster.getFileSystem();
+    assertTrue(writeFs.mkdirs(new org.apache.hadoop.fs.Path("/hdfs")));
+
+    boolean ok = writeFs.exists(new org.apache.hadoop.fs.Path("/alex"));
+    assertFalse(ok);
+    ok = writeFs.mkdirs(new org.apache.hadoop.fs.Path("/alex"));
+    assertTrue(ok);
+
+    writeFs.setOwner(new org.apache.hadoop.fs.Path("/alex"), "alex", "alex");
+    FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL, false);
+    writeFs.setPermission(new org.apache.hadoop.fs.Path("/alex"), permission);
+
+    writeFs.setPermission(new org.apache.hadoop.fs.Path("/"), permission);
+
+    String oneFileContent = "test content";
+    FSDataOutputStream fsDataOutputStream =
+        writeFs.create(new org.apache.hadoop.fs.Path("/alex/basic.txt"));
+    BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, "UTF-8"));
+    br.write(oneFileContent);
+    br.close();
+
+    writeFs.setOwner(new org.apache.hadoop.fs.Path("/alex/basic.txt"), "alex", "alex");
+    writeFs.setPermission(new org.apache.hadoop.fs.Path("/alex/basic.txt"), permission);
+  }
+
+  private Configuration createSecureDFSConfig() throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
+
+    conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
+    conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, hdfsKeytab);
+    conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
+    conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, hdfsKeytab);
+    conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
+    conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
+    conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+    conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
+
+    // https://issues.apache.org/jira/browse/HDFS-7431
+    conf.set(DFS_ENCRYPT_DATA_TRANSFER_KEY, "true");
+
+    conf.set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS,
+        "org.apache.uniffle.common.KerberizedHdfs$TestDummyImpersonationProvider");
+
+    String keystoresDir = kerberizedDfsBaseDir.toFile().getAbsolutePath();
+    String sslConfDir = KeyStoreTestUtil.getClasspathDir(testRunnerCls);
+    KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
+
+    return conf;
+  }
+
+  private void startKerberizedDFS() throws Exception {
+    String krb5Conf = kdc.getKrb5conf().getAbsolutePath();
+    System.setProperty("java.security.krb5.conf", krb5Conf);
+
+    String principal = "hdfs" + "/localhost";
+    File keytab = new File(workDir, "hdfs.keytab");
+    kdc.createPrincipal(keytab, principal);
+    hdfsKeytab = keytab.getPath();
+    hdfsPrincipal = principal + "@" + kdc.getRealm();
+
+    Configuration conf = new Configuration();
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    
+    UserGroupInformation.setConfiguration(conf);
+    UserGroupInformation.setShouldRenewImmediatelyForTests(true);
+    UserGroupInformation ugi =
+        UserGroupInformation.loginUserFromKeytabAndReturnUGI(hdfsPrincipal, hdfsKeytab);
+
+    Configuration hdfsConf = createSecureDFSConfig();
+    hdfsConf.set("hadoop.proxyuser.hdfs.hosts", "*");
+    hdfsConf.set("hadoop.proxyuser.hdfs.groups", "*");
+    hdfsConf.set("hadoop.proxyuser.hdfs.users", "*");
+
+    kerberizedDfsCluster = ugi.doAs(new PrivilegedExceptionAction<MiniDFSCluster>() {
+      @Override
+      public MiniDFSCluster run() throws Exception {
+        return new MiniDFSCluster
+            .Builder(hdfsConf)
+            .numDataNodes(1)
+            .clusterId("kerberized-cluster-1")
+            .checkDataNodeAddrConfig(true)
+            .build();
+      }
+    });
+  }
+
+  private void startKDC() throws Exception {
+    Properties kdcConf = MiniKdc.createConf();
+    String hostName = "localhost";
+    kdcConf.setProperty(MiniKdc.INSTANCE, "DefaultKrbServer");
+    kdcConf.setProperty(MiniKdc.ORG_NAME, "EXAMPLE");
+    kdcConf.setProperty(MiniKdc.ORG_DOMAIN, "COM");
+    kdcConf.setProperty(MiniKdc.KDC_BIND_ADDRESS, hostName);
+    kdcConf.setProperty(MiniKdc.KDC_PORT, "0");
+    workDir = tempDir.toFile();
+    kdc = new MiniKdc(kdcConf, workDir);
+    kdc.start();
+
+    krb5ConfFile = kdc.getKrb5conf().getAbsolutePath();
+    System.setProperty("java.security.krb5.conf", krb5ConfFile);
+  }
+
+  public void tearDown() throws IOException {
+    if (kerberizedDfsCluster != null) {
+      kerberizedDfsCluster.shutdown(true);
+    }
+    if (kdc != null) {
+      kdc.stop();
+    }
+    setTestRunner(KerberizedHdfs.class);
+    UserGroupInformation.reset();
+  }
+
+  public String getSchemeAndAuthorityPrefix() {
+    return String.format("hdfs://localhost:%s/", kerberizedDfsCluster.getNameNodePort());
+  }
+
+  public Configuration getConf() throws IOException {
+    Configuration configuration = kerberizedDfsCluster.getFileSystem().getConf();
+    configuration.setBoolean("fs.hdfs.impl.disable.cache", true);
+    configuration.set("hadoop.security.authentication", UserGroupInformation.AuthenticationMethod.KERBEROS.name());
+    return configuration;
+  }
+
+  public FileSystem getFileSystem() throws Exception {
+    return kerberizedDfsCluster.getFileSystem();
+  }
+
+  public String getHdfsKeytab() {
+    return hdfsKeytab;
+  }
+
+  public String getHdfsPrincipal() {
+    return hdfsPrincipal;
+  }
+
+  public String getAlexKeytab() {
+    return alexKeytab;
+  }
+
+  public String getAlexPrincipal() {
+    return alexPrincipal;
+  }
+
+  public String getKrb5ConfFile() {
+    return krb5ConfFile;
+  }
+
+  public MiniKdc getKdc() {
+    return kdc;
+  }
+
+  /**
+   * Should be invoked by extending class to solve the NPE.
+   * refer to: https://github.com/apache/hbase/pull/1207
+   */
+  public void setTestRunner(Class cls) {
+    this.testRunnerCls = cls;
+  }
+
+  static class TestDummyImpersonationProvider implements ImpersonationProvider {
+
+    @Override
+    public void init(String configurationPrefix) {
+      // ignore
+    }
+
+    /**
+     * Allow the user of HDFS can be delegated to alex.
+     */
+    @Override
+    public void authorize(UserGroupInformation userGroupInformation, String s) throws AuthorizationException {
+      UserGroupInformation superUser = userGroupInformation.getRealUser();
+      LOGGER.info("Proxy: {}", superUser.getShortUserName());
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      // ignore
+    }
+
+    @Override
+    public Configuration getConf() {
+      return null;
+    }
+  }
+}
diff --git a/common/src/test/java/org/apache/uniffle/common/KerberizedHdfsBase.java b/common/src/test/java/org/apache/uniffle/common/KerberizedHdfsBase.java
new file mode 100644
index 00000000..aec36275
--- /dev/null
+++ b/common/src/test/java/org/apache/uniffle/common/KerberizedHdfsBase.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common;
+
+import org.junit.jupiter.api.AfterAll;
+
+import org.apache.uniffle.common.security.HadoopSecurityContext;
+import org.apache.uniffle.common.security.NoOpSecurityContext;
+import org.apache.uniffle.common.security.SecurityConfig;
+import org.apache.uniffle.common.security.SecurityContextFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class KerberizedHdfsBase {
+  protected static KerberizedHdfs kerberizedHdfs;
+  protected static Class<?> testRunner = KerberizedHdfsBase.class;
+
+  public static void init() throws Exception {
+    kerberizedHdfs = new KerberizedHdfs();
+    kerberizedHdfs.setTestRunner(testRunner);
+    kerberizedHdfs.setup();
+  }
+
+  @AfterAll
+  public static void clear() throws Exception {
+    kerberizedHdfs.tearDown();
+    kerberizedHdfs = null;
+  }
+
+  public static void initHadoopSecurityContext() throws Exception {
+    // init the security context
+    SecurityConfig securityConfig = SecurityConfig
+        .newBuilder()
+        .keytabFilePath(kerberizedHdfs.getHdfsKeytab())
+        .principal(kerberizedHdfs.getHdfsPrincipal())
+        .reloginIntervalSec(1000)
+        .build();
+    SecurityContextFactory.get().init(securityConfig);
+
+    assertEquals(HadoopSecurityContext.class, SecurityContextFactory.get().getSecurityContext().getClass());
+  }
+
+  public static void removeHadoopSecurityContext() throws Exception {
+    SecurityContextFactory.get().init(null);
+    assertEquals(NoOpSecurityContext.class, SecurityContextFactory.get().getSecurityContext().getClass());
+  }
+}
+
diff --git a/common/src/test/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProviderTest.java b/common/src/test/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProviderTest.java
new file mode 100644
index 00000000..a787a099
--- /dev/null
+++ b/common/src/test/java/org/apache/uniffle/common/filesystem/HadoopFilesystemProviderTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.filesystem;
+
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.KerberizedHdfsBase;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HadoopFilesystemProviderTest extends KerberizedHdfsBase {
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    testRunner = HadoopFilesystemProvider.class;
+    KerberizedHdfsBase.init();
+    UserGroupInformation.reset();
+  }
+
+  /**
+   * When visiting secured HDFS but not initialize hadoop security context, it will throw exception
+   */
+  @Test
+  public void testGetSecuredFilesystemButNotInitializeHadoopSecurityContext() throws Exception {
+
+    removeHadoopSecurityContext();
+
+    try {
+      FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(new Path("/hdfs"), kerberizedHdfs.getConf());
+      fileSystem.mkdirs(new Path("/hdfs/HadoopFilesystemProviderTest"));
+    } catch (AccessControlException e) {
+
+    }
+  }
+
+  @Test
+  public void testGetSecuredFilesystem() throws Exception {
+    initHadoopSecurityContext();
+
+    // case1: it should throw exception when user is empty or null.
+    try {
+      FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(null, new Path("/hdfs"), kerberizedHdfs.getConf());
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("User must be not null or empty"));
+    }
+
+    // case2: it should return the proxy user's filesystem
+    FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem("alex", new Path("/alex"), kerberizedHdfs.getConf());
+    Path alexPath = new Path("/alex/HadoopFilesystemProviderTest-testGetSecuredFilesystem");
+    assertTrue(fileSystem.mkdirs(alexPath));
+
+    assertEquals("alex", fileSystem.getFileStatus(alexPath).getOwner());
+
+    // case3: it should return the login user's filesystem
+    fileSystem = HadoopFilesystemProvider.getFilesystem(new Path("/hdfs"), kerberizedHdfs.getConf());
+    Path hdfsPath = new Path("/hdfs/HadoopFilesystemProviderTest-testGetSecuredFilesystem");
+    assertTrue(fileSystem.mkdirs(hdfsPath));
+
+    assertEquals("hdfs", fileSystem.getFileStatus(hdfsPath).getOwner());
+  }
+
+  @Test
+  public void testWriteAndReadBySecuredFilesystem() throws Exception {
+    initHadoopSecurityContext();
+
+    // write file by proxy user.
+    String fileContent = "hello world";
+    Path filePath = new Path("/alex/HadoopFilesystemProviderTest-testWriteAndReadBySecuredFilesystem.file");
+    FileSystem writeFs = HadoopFilesystemProvider.getFilesystem("alex", filePath, kerberizedHdfs.getConf());
+
+    boolean ok = writeFs.exists(new org.apache.hadoop.fs.Path("/alex"));
+    assertTrue(ok);
+    assertEquals("alex", writeFs.getFileStatus(new org.apache.hadoop.fs.Path("/alex")).getOwner());
+
+    FSDataOutputStream fsDataOutputStream = writeFs.create(filePath);
+    BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, "UTF-8"));
+    br.write(fileContent);
+    br.close();
+
+    assertTrue(writeFs.exists(filePath));
+    assertEquals("alex", writeFs.getFileStatus(filePath).getOwner());
+
+    // Read content from HDFS by alex user directly
+    UserGroupInformation readerUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
+        kerberizedHdfs.getAlexPrincipal() + "@" + kerberizedHdfs.getKdc().getRealm(),
+        kerberizedHdfs.getAlexKeytab()
+    );
+    readerUGI.doAs((PrivilegedExceptionAction<Object>) () -> {
+      FileSystem fs = FileSystem.get(kerberizedHdfs.getConf());
+      FSDataInputStream inputStream = fs.open(filePath);
+      String fetchedResult = IOUtils.toString(inputStream);
+      assertEquals(fileContent, fetchedResult);
+      return null;
+    });
+  }
+}
diff --git a/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java b/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java
new file mode 100644
index 00000000..dde0340a
--- /dev/null
+++ b/common/src/test/java/org/apache/uniffle/common/security/HadoopSecurityContextTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.security;
+
+import java.util.concurrent.Callable;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.KerberizedHdfsBase;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class HadoopSecurityContextTest extends KerberizedHdfsBase {
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    testRunner = HadoopSecurityContextTest.class;
+    KerberizedHdfsBase.init();
+  }
+
+  @Test
+  public void testSecuredCallable() throws Exception {
+    String val = System.getProperty("java.security.krb5.conf");
+
+    HadoopSecurityContext context = new HadoopSecurityContext(
+        null,
+        kerberizedHdfs.getHdfsKeytab(),
+        kerberizedHdfs.getHdfsPrincipal(),
+        1000
+    );
+
+    // case1: when user is empty or null, it should throw exception
+    try {
+      context.runSecured(StringUtils.EMPTY, (Callable<Void>) () -> null);
+      fail();
+    } catch (Exception e) {
+
+    }
+
+    // case2: run by the login user, there is no need to wrap proxy action
+    Path pathWithHdfsUser = new Path("/hdfs/HadoopSecurityContextTest");
+    context.runSecured("hdfs", (Callable<Void>) () -> {
+      kerberizedHdfs.getFileSystem().mkdirs(pathWithHdfsUser);
+      return null;
+    });
+    FileStatus fileStatus = kerberizedHdfs.getFileSystem().getFileStatus(pathWithHdfsUser);
+    assertEquals("hdfs", fileStatus.getOwner());
+
+    // case3: run by the proxy user
+    Path pathWithAlexUser = new Path("/alex/HadoopSecurityContextTest");
+    context.runSecured("alex", (Callable<Void>) () -> {
+      kerberizedHdfs.getFileSystem().mkdirs(pathWithAlexUser);
+      return null;
+    });
+    fileStatus = kerberizedHdfs.getFileSystem().getFileStatus(pathWithAlexUser);
+    assertEquals("alex", fileStatus.getOwner());
+
+    context.close();
+  }
+
+  @Test
+  public void testCreateIllegalContext() throws Exception {
+    // case1: lack principal, should throw exception
+    try {
+      HadoopSecurityContext context = new HadoopSecurityContext(
+          null,
+          kerberizedHdfs.getHdfsKeytab(),
+          null,
+          1000
+      );
+      fail();
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("principal must be not null or empty"));
+    }
+
+    // case2: lack keytab, should throw exception
+    try {
+      HadoopSecurityContext context = new HadoopSecurityContext(
+          null,
+          null,
+          kerberizedHdfs.getHdfsPrincipal(),
+          1000
+      );
+      fail();
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("KeytabFilePath must be not null or empty"));
+    }
+
+    // case3: illegal relogin interval sec
+    try {
+      HadoopSecurityContext context = new HadoopSecurityContext(
+          null,
+          kerberizedHdfs.getHdfsKeytab(),
+          kerberizedHdfs.getHdfsPrincipal(),
+          0
+      );
+      fail();
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("refreshIntervalSec must be not negative"));
+    }
+  }
+}
diff --git a/common/src/test/java/org/apache/uniffle/common/security/SecurityContextFactoryTest.java b/common/src/test/java/org/apache/uniffle/common/security/SecurityContextFactoryTest.java
new file mode 100644
index 00000000..301e7588
--- /dev/null
+++ b/common/src/test/java/org/apache/uniffle/common/security/SecurityContextFactoryTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.security;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.KerberizedHdfsBase;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class SecurityContextFactoryTest extends KerberizedHdfsBase {
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    testRunner = SecurityContextFactoryTest.class;
+    KerberizedHdfsBase.init();
+  }
+
+  @AfterEach
+  public void afterEach() throws Exception {
+    SecurityContextFactory.get().init(null);
+  }
+
+  /**
+   * It should return the {@link NoOpSecurityContext} when not initializing securityContext
+   */
+  @Test
+  public void testDefaultSecurityContext() throws Exception {
+    SecurityContext securityContext = SecurityContextFactory.get().getSecurityContext();
+    assertEquals(NoOpSecurityContext.class, securityContext.getClass());
+
+    final SecurityConfig securityConfig = null;
+    SecurityContextFactory.get().init(securityConfig);
+    securityContext = SecurityContextFactory.get().getSecurityContext();
+    assertEquals(NoOpSecurityContext.class, securityContext.getClass());
+  }
+
+  @Test
+  public void testCreateHadoopSecurityContext() throws Exception {
+    // case1: lack some config, should throw exception
+    final SecurityConfig securityConfig = SecurityConfig
+        .newBuilder()
+        .keytabFilePath("")
+        .build();
+    try {
+      SecurityContextFactory.get().init(securityConfig);
+      fail();
+    } catch (Exception e) {
+      // ignore
+    }
+
+    // case2: create the correct hadoop security context
+    final SecurityConfig correntConfig = SecurityConfig
+        .newBuilder()
+        .keytabFilePath(kerberizedHdfs.getHdfsKeytab())
+        .principal(kerberizedHdfs.getHdfsPrincipal())
+        .reloginIntervalSec(60)
+        .build();
+    SecurityContextFactory.get().init(correntConfig);
+    SecurityContext securityContext = SecurityContextFactory.get().getSecurityContext();
+    assertEquals(HadoopSecurityContext.class, securityContext.getClass());
+    securityContext.close();
+  }
+}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java
index aa1d8903..f6a4219a 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.ThreadUtils;
 
@@ -59,7 +60,7 @@ public class AccessCandidatesChecker extends AbstractAccessChecker {
     String pathStr = conf.get(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH);
     this.path = new Path(pathStr);
     Configuration hadoopConf = accessManager.getHadoopConf();
-    this.fileSystem = CoordinatorUtils.getFileSystemForPath(path, hadoopConf);
+    this.fileSystem = HadoopFilesystemProvider.getFilesystem(path, hadoopConf);
 
     if (!fileSystem.isFile(path)) {
       String msg = String.format("Fail to init AccessCandidatesChecker, %s is not a file.", path.toUri());
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
index 1144ad41..a41cd62e 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
@@ -39,7 +39,8 @@ public class AccessManager {
   private List<AccessChecker> accessCheckers = Lists.newArrayList();
 
   public AccessManager(
-      CoordinatorConf conf, ClusterManager clusterManager, Configuration hadoopConf) throws RuntimeException {
+          CoordinatorConf conf, ClusterManager clusterManager,
+          Configuration hadoopConf) throws Exception {
     this.coordinatorConf = conf;
     this.clusterManager = clusterManager;
     this.hadoopConf = hadoopConf;
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
index c3c1fb5e..6b57730c 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 import org.apache.uniffle.common.util.ThreadUtils;
 
 public class ClientConfManager implements Closeable {
@@ -62,7 +63,7 @@ public class ClientConfManager implements Closeable {
     String pathStr = conf.get(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH);
     this.path = new Path(pathStr);
 
-    this.fileSystem = CoordinatorUtils.getFileSystemForPath(path, hadoopConf);
+    this.fileSystem = HadoopFilesystemProvider.getFilesystem(path, hadoopConf);
 
     if (!fileSystem.isFile(path)) {
       String msg = String.format("Fail to init ClientConfManager, %s is not a file.", path.toUri());
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 4a37353d..8ce10eb1 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -27,9 +27,16 @@ import org.apache.uniffle.common.Arguments;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.metrics.JvmMetrics;
 import org.apache.uniffle.common.rpc.ServerInterface;
+import org.apache.uniffle.common.security.SecurityConfig;
+import org.apache.uniffle.common.security.SecurityContextFactory;
 import org.apache.uniffle.common.web.CommonMetricsServlet;
 import org.apache.uniffle.common.web.JettyServer;
 
+import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE;
+import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE;
+import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL;
+import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_RELOGIN_INTERVAL_SEC;
+
 /**
  * The main entrance of coordinator service
  */
@@ -105,6 +112,7 @@ public class CoordinatorServer {
     if (clientConfManager != null) {
       clientConfManager.close();
     }
+    SecurityContextFactory.get().getSecurityContext().close();
     server.stop();
   }
 
@@ -114,9 +122,20 @@ public class CoordinatorServer {
     registerMetrics();
     this.applicationManager = new ApplicationManager(coordinatorConf);
 
+    SecurityConfig securityConfig = null;
+    if (coordinatorConf.getBoolean(RSS_SECURITY_HADOOP_KERBEROS_ENABLE)) {
+      securityConfig = SecurityConfig.newBuilder()
+          .keytabFilePath(coordinatorConf.getString(RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE))
+          .principal(coordinatorConf.getString(RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL))
+          .reloginIntervalSec(coordinatorConf.getLong(RSS_SECURITY_HADOOP_KERBEROS_RELOGIN_INTERVAL_SEC))
+          .build();
+    }
+    SecurityContextFactory.get().init(securityConfig);
+
     // load default hadoop configuration
     Configuration hadoopConf = new Configuration();
     ClusterManagerFactory clusterManagerFactory = new ClusterManagerFactory(coordinatorConf, hadoopConf);
+
     this.clusterManager = clusterManagerFactory.getClusterManager();
     this.clientConfManager = new ClientConfManager(coordinatorConf, hadoopConf, applicationManager);
     AssignmentStrategyFactory assignmentStrategyFactory =
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorUtils.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorUtils.java
index d3531821..e3984ed7 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorUtils.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorUtils.java
@@ -17,7 +17,6 @@
 
 package org.apache.uniffle.coordinator;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -25,10 +24,6 @@ import java.util.Map;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,23 +68,6 @@ public class CoordinatorUtils {
     return ranges;
   }
 
-  // TODO: the pure hdfs related classes and methods should be placed in a common module
-  public static FileSystem getFileSystemForPath(Path path, Configuration conf) throws IOException {
-    // For local file systems, return the raw local file system, such calls to flush()
-    // actually flushes the stream.
-    try {
-      FileSystem fs = path.getFileSystem(conf);
-      if (fs instanceof LocalFileSystem) {
-        LOG.debug("{} is local file system", path);
-        return ((LocalFileSystem) fs).getRawFileSystem();
-      }
-      return fs;
-    } catch (IOException e) {
-      LOG.error("Fail to get filesystem of {}", path);
-      throw e;
-    }
-  }
-
   public static Map<String, Map<String, String>> extractRemoteStorageConf(String confString) {
     Map<String, Map<String, String>> res = Maps.newHashMap();
     if (StringUtils.isEmpty(confString)) {
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 2c54a4d5..c60a0d8d 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 import org.apache.uniffle.common.util.ThreadUtils;
 
 public class SimpleClusterManager implements ClusterManager {
@@ -64,7 +65,7 @@ public class SimpleClusterManager implements ClusterManager {
   private long outputAliveServerCount = 0;
   private final long periodicOutputIntervalTimes;
 
-  public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) throws IOException {
+  public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) throws Exception {
     this.shuffleNodesMax = conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
     this.heartbeatTimeout = conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT);
     // the thread for checking if shuffle server report heartbeat in time
@@ -78,7 +79,7 @@ public class SimpleClusterManager implements ClusterManager {
 
     String excludeNodesPath = conf.getString(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH, "");
     if (!StringUtils.isEmpty(excludeNodesPath)) {
-      this.hadoopFileSystem = CoordinatorUtils.getFileSystemForPath(new Path(excludeNodesPath), hadoopConf);
+      this.hadoopFileSystem = HadoopFilesystemProvider.getFilesystem(new Path(excludeNodesPath), hadoopConf);
       long updateNodesInterval = conf.getLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL);
       checkNodesExecutorService = Executors.newSingleThreadScheduledExecutor(
           ThreadUtils.getThreadFactory("UpdateExcludeNodes-%d"));
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index e7894f1d..706bcde6 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -51,7 +51,7 @@ public class SimpleClusterManagerTest {
   }
 
   @Test
-  public void getServerListTest() throws IOException {
+  public void getServerListTest() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
     SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration());
@@ -106,7 +106,7 @@ public class SimpleClusterManagerTest {
   }
 
   @Test
-  public void testGetCorrectServerNodesWhenOneNodeRemovedAndUnhealthyNodeFound() throws IOException {
+  public void testGetCorrectServerNodesWhenOneNodeRemovedAndUnhealthyNodeFound() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
     SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration());
@@ -183,7 +183,7 @@ public class SimpleClusterManagerTest {
   }
 
   @Test
-  public void testGetCorrectServerNodesWhenOneNodeRemoved() throws IOException {
+  public void testGetCorrectServerNodesWhenOneNodeRemoved() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
     SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new Configuration());
diff --git a/integration-test/common/pom.xml b/integration-test/common/pom.xml
index c4dc048f..9755e698 100644
--- a/integration-test/common/pom.xml
+++ b/integration-test/common/pom.xml
@@ -39,6 +39,12 @@
             <artifactId>rss-internal-client</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.uniffle</groupId>
+            <artifactId>rss-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.uniffle</groupId>
             <artifactId>shuffle-server</artifactId>
@@ -87,6 +93,16 @@
             <artifactId>lz4</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minikdc</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
index 436953b2..26a860e9 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
@@ -23,6 +23,7 @@ import java.io.PrintWriter;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -55,12 +56,20 @@ public class AccessCandidatesCheckerHdfsTest extends HdfsTestBase {
   @Test
   public void test() throws Exception {
     String candidatesFile = HDFS_URI + "/test/access_checker_candidates";
+    createAndRunCases(HDFS_URI, candidatesFile, fs, HdfsTestBase.conf);
+  }
+
+  public static void createAndRunCases(
+      String clusterPrefix,
+      String candidatesFile,
+      FileSystem fs,
+      Configuration hadoopConf) throws Exception {
     Path path = new Path(candidatesFile);
     FSDataOutputStream out = fs.create(path);
 
     CoordinatorConf conf = new CoordinatorConf();
     conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC, 1);
-    conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, HDFS_URI);
+    conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, clusterPrefix);
     conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
         "org.apache.uniffle.coordinator.AccessCandidatesChecker");
 
@@ -91,7 +100,7 @@ public class AccessCandidatesCheckerHdfsTest extends HdfsTestBase {
     printWriter.println("2 ");
     printWriter.flush();
     printWriter.close();
-    AccessManager accessManager = new AccessManager(conf, null, HdfsTestBase.conf);
+    AccessManager accessManager = new AccessManager(conf, null, hadoopConf);
     AccessCandidatesChecker checker = (AccessCandidatesChecker) accessManager.getAccessCheckers().get(0);
     // load the config at the beginning
     sleep(1200);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerKerberizedHdfsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerKerberizedHdfsTest.java
new file mode 100644
index 00000000..5b5c75f6
--- /dev/null
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerKerberizedHdfsTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.KerberizedHdfsBase;
+import org.apache.uniffle.coordinator.CoordinatorMetrics;
+
+public class AccessCandidatesCheckerKerberizedHdfsTest extends KerberizedHdfsBase {
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    testRunner = AccessCandidatesCheckerKerberizedHdfsTest.class;
+    KerberizedHdfsBase.init();
+  }
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    CoordinatorMetrics.register();
+    initHadoopSecurityContext();
+  }
+
+  @AfterEach
+  public void afterEach() throws Exception {
+    CoordinatorMetrics.clear();
+    removeHadoopSecurityContext();
+  }
+
+  @Test
+  public void test() throws Exception {
+    String candidatesFile =  kerberizedHdfs.getSchemeAndAuthorityPrefix() + "/test/access_checker_candidates";
+    AccessCandidatesCheckerHdfsTest.createAndRunCases(
+        kerberizedHdfs.getSchemeAndAuthorityPrefix(),
+        candidatesFile,
+        kerberizedHdfs.getFileSystem(),
+        kerberizedHdfs.getConf()
+    );
+  }
+}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerHdfsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerHdfsTest.java
index c2ea4a56..3d2bda14 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerHdfsTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerHdfsTest.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
 
@@ -42,11 +43,19 @@ public class ClientConfManagerHdfsTest extends HdfsTestBase {
   @Test
   public void test() throws Exception {
     String cfgFile = HDFS_URI + "/test/client_conf";
+    createAndRunClientConfManagerCases(HDFS_URI, cfgFile, fs, HdfsTestBase.conf);
+  }
+
+  public static void createAndRunClientConfManagerCases(
+      String clusterPathPrefix,
+      String cfgFile,
+      FileSystem fileSystem,
+      Configuration hadoopConf) throws Exception {
     Path path = new Path(cfgFile);
-    FSDataOutputStream out = fs.create(path);
+    FSDataOutputStream out = fileSystem.create(path);
 
     CoordinatorConf conf = new CoordinatorConf();
-    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, HDFS_URI);
+    conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, clusterPathPrefix);
     conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, 1);
     conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true);
 
@@ -71,7 +80,7 @@ public class ClientConfManagerHdfsTest extends HdfsTestBase {
     printWriter.println("spark.mock.3 true  ");
     printWriter.flush();
     printWriter.close();
-    clientConfManager = new ClientConfManager(conf, HdfsTestBase.conf, new ApplicationManager(conf));
+    clientConfManager = new ClientConfManager(conf, hadoopConf, new ApplicationManager(conf));
     sleep(1200);
     Map<String, String> clientConf = clientConfManager.getClientConf();
     assertEquals("abc", clientConf.get("spark.mock.1"));
@@ -84,7 +93,7 @@ public class ClientConfManagerHdfsTest extends HdfsTestBase {
     printWriter.flush();
     printWriter.close();
     sleep(1300);
-    assertTrue(fs.exists(path));
+    assertTrue(fileSystem.exists(path));
     clientConf = clientConfManager.getClientConf();
     assertEquals("abc", clientConf.get("spark.mock.1"));
     assertEquals("123", clientConf.get("spark.mock.2"));
@@ -92,8 +101,8 @@ public class ClientConfManagerHdfsTest extends HdfsTestBase {
     assertEquals(3, clientConf.size());
 
     // the config will not be changed when the conf file is deleted
-    fs.delete(path, true);
-    assertFalse(fs.exists(path));
+    fileSystem.delete(path, true);
+    assertFalse(fileSystem.exists(path));
     sleep(1200);
     clientConf = clientConfManager.getClientConf();
     assertEquals("abc", clientConf.get("spark.mock.1"));
@@ -103,14 +112,14 @@ public class ClientConfManagerHdfsTest extends HdfsTestBase {
 
     // the normal update config process, move the new conf file to the old one
     Path tmpPath = new Path(cfgFile + ".tmp");
-    out = fs.create(tmpPath);
+    out = fileSystem.create(tmpPath);
     printWriter = new PrintWriter(new OutputStreamWriter(out));
     printWriter.println("spark.mock.4 deadbeaf");
     printWriter.println("spark.mock.5 9527");
     printWriter.println("spark.mock.6 9527 3423");
     printWriter.println("spark.mock.7");
     printWriter.close();
-    fs.rename(tmpPath, path);
+    fileSystem.rename(tmpPath, path);
     sleep(1200);
     clientConf = clientConfManager.getClientConf();
     assertEquals("deadbeaf", clientConf.get("spark.mock.4"));
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHdfsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHdfsTest.java
new file mode 100644
index 00000000..73a3940a
--- /dev/null
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ClientConfManagerKerberlizedHdfsTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.KerberizedHdfsBase;
+
+public class ClientConfManagerKerberlizedHdfsTest extends KerberizedHdfsBase {
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    testRunner = ClientConfManagerKerberlizedHdfsTest.class;
+    KerberizedHdfsBase.init();
+  }
+
+  @Test
+  public void testConfInHDFS() throws Exception {
+      String cfgFile = kerberizedHdfs.getSchemeAndAuthorityPrefix() + "/test/client_conf";
+      ClientConfManagerHdfsTest.createAndRunClientConfManagerCases(
+          kerberizedHdfs.getSchemeAndAuthorityPrefix(),
+          cfgFile,
+          kerberizedHdfs.getFileSystem(),
+          kerberizedHdfs.getConf()
+      );
+  }
+}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
index 9edb3427..99992c9f 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleReadWriteBase.java
@@ -45,10 +45,10 @@ import java.util.concurrent.atomic.AtomicLong;
 public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
 
   private static AtomicLong ATOMIC_LONG = new AtomicLong(0L);
-  protected List<ShuffleServerInfo> mockSSI =
+  public static List<ShuffleServerInfo> mockSSI =
       Lists.newArrayList(new ShuffleServerInfo("id", "host", 0));
 
-  protected List<ShuffleBlockInfo> createShuffleBlockList(int shuffleId, int partitionId, long taskAttemptId,
+  public static List<ShuffleBlockInfo> createShuffleBlockList(int shuffleId, int partitionId, long taskAttemptId,
       int blockNum, int length, Roaring64NavigableMap blockIdBitmap, Map<Long, byte[]> dataMap,
       List<ShuffleServerInfo> shuffleServerInfoList) {
     List<ShuffleBlockInfo> shuffleBlockInfoList = Lists.newArrayList();
@@ -67,7 +67,7 @@ public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
     return shuffleBlockInfoList;
   }
 
-  protected Map<Integer, List<ShuffleBlockInfo>> createTestData(
+  public static Map<Integer, List<ShuffleBlockInfo>> createTestData(
       Roaring64NavigableMap[] bitmaps,
       Map<Long, byte[]> expectedData) {
     for (int i = 0; i < 4; i++) {
@@ -89,7 +89,7 @@ public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
     return partitionToBlocks;
   }
 
-  protected List<ShuffleBlockInfo> createShuffleBlockList(int shuffleId, int partitionId, long taskAttemptId,
+  public static List<ShuffleBlockInfo> createShuffleBlockList(int shuffleId, int partitionId, long taskAttemptId,
       int blockNum, int length, Roaring64NavigableMap blockIdBitmap, Map<Long, byte[]> dataMap) {
     List<ShuffleServerInfo> shuffleServerInfoList =
         Lists.newArrayList(new ShuffleServerInfo("id", "host", 0));
@@ -97,15 +97,15 @@ public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
         shuffleId, partitionId, taskAttemptId, blockNum, length, blockIdBitmap, dataMap, shuffleServerInfoList);
   }
 
-  protected boolean compareByte(byte[] expected, ByteBuffer buffer) {
+  public static boolean compareByte(byte[] expected, ByteBuffer buffer) {
     return TestUtils.compareByte(expected, buffer);
   }
 
-  protected void validateResult(ShuffleReadClient readClient, Map<Long, byte[]> expectedData) {
+  public static void validateResult(ShuffleReadClient readClient, Map<Long, byte[]> expectedData) {
     TestUtils.validateResult(readClient, expectedData);
   }
 
-  protected static String generateBasePath() {
+  public static String generateBasePath() {
     File tmpDir = Files.createTempDir();
     File dataDir1 = new File(tmpDir, "data1");
     File dataDir2 = new File(tmpDir, "data2");
@@ -114,7 +114,7 @@ public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
     return basePath;
   }
 
-  protected List<ShuffleDataSegment> readShuffleIndexSegments(
+  public static List<ShuffleDataSegment> readShuffleIndexSegments(
       ShuffleServerGrpcClient shuffleServerClient,
       String appId,
       int shuffleId,
@@ -130,7 +130,7 @@ public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
 
   }
 
-  protected ShuffleDataResult readShuffleData(
+  public static ShuffleDataResult readShuffleData(
       ShuffleServerGrpcClient shuffleServerClient,
       String appId,
       int shuffleId,
@@ -154,7 +154,7 @@ public abstract class ShuffleReadWriteBase extends IntegrationTestBase {
         segment.getBufferSegments());
   }
 
-  protected ShuffleDataResult readShuffleData(
+  public static ShuffleDataResult readShuffleData(
       ShuffleServerGrpcClient shuffleServerClient,
       String appId,
       int shuffleId,
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index 34d26134..c35e8ac9 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import com.google.protobuf.ByteString;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
new file mode 100644
index 00000000..88c5438f
--- /dev/null
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import org.apache.uniffle.client.TestUtils;
+import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.request.RssFinishShuffleRequest;
+import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
+import org.apache.uniffle.client.request.RssSendCommitRequest;
+import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.response.CompressedShuffleBlock;
+import org.apache.uniffle.client.util.DefaultIdHelper;
+import org.apache.uniffle.common.KerberizedHdfsBase;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.apache.uniffle.test.ShuffleReadWriteBase.mockSSI;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ShuffleServerWithKerberizedHdfsTest extends KerberizedHdfsBase {
+
+  private static final int COORDINATOR_RPC_PROT = 19999;
+  private static final int SHUFFLE_SERVER_PORT = 29999;
+  private static final String COORDINATOR_QUORUM = "localhost:" + COORDINATOR_RPC_PROT;
+
+  private ShuffleServerGrpcClient shuffleServerClient;
+  private static CoordinatorServer coordinatorServer;
+  private static ShuffleServer shuffleServer;
+
+  private static ShuffleServerConf getShuffleServerConf() throws Exception {
+    File dataFolder = Files.createTempDirectory("rssdata").toFile();
+    ShuffleServerConf serverConf = new ShuffleServerConf();
+    dataFolder.deleteOnExit();
+    serverConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT);
+    serverConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE_HDFS.name());
+    serverConf.setString("rss.storage.basePath", dataFolder.getAbsolutePath());
+    serverConf.setString("rss.server.buffer.capacity", "671088640");
+    serverConf.setString("rss.server.memory.shuffle.highWaterMark", "50.0");
+    serverConf.setString("rss.server.memory.shuffle.lowWaterMark", "0.0");
+    serverConf.setString("rss.server.read.buffer.capacity", "335544320");
+    serverConf.setString("rss.coordinator.quorum", COORDINATOR_QUORUM);
+    serverConf.setString("rss.server.heartbeat.delay", "1000");
+    serverConf.setString("rss.server.heartbeat.interval", "1000");
+    serverConf.setInteger("rss.jetty.http.port", 18080);
+    serverConf.setInteger("rss.jetty.corePool.size", 64);
+    serverConf.setInteger("rss.rpc.executor.size", 10);
+    serverConf.setString("rss.server.hadoop.dfs.replication", "2");
+    serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 1024L);
+    serverConf.setBoolean("rss.server.health.check.enable", false);
+    serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
+    return serverConf;
+  }
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    testRunner = ShuffleServerWithKerberizedHdfsTest.class;
+    KerberizedHdfsBase.init();
+
+    CoordinatorConf coordinatorConf = new CoordinatorConf();
+    coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 19999);
+    coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, 19998);
+    coordinatorConf.setInteger(CoordinatorConf.RPC_EXECUTOR_SIZE, 10);
+    coordinatorServer = new CoordinatorServer(coordinatorConf);
+    coordinatorServer.start();
+
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    shuffleServer = new ShuffleServer(shuffleServerConf);
+    shuffleServer.start();
+  }
+
+  @AfterAll
+  public static void afterAll() throws Exception {
+    if (coordinatorServer != null) {
+      coordinatorServer.stopServer();
+    }
+    if (shuffleServer != null) {
+      shuffleServer.stopServer();
+    }
+  }
+
+  @BeforeEach
+  public void beforeEach() throws Exception {
+    initHadoopSecurityContext();
+    shuffleServerClient = new ShuffleServerGrpcClient("localhost", SHUFFLE_SERVER_PORT);
+  }
+
+  @AfterEach
+  public void clearEach() throws Exception {
+    removeHadoopSecurityContext();
+    shuffleServerClient.close();
+  }
+
+  private Map<String, String> conf2Map(Configuration conf) {
+    Map<String, String> confMap = new HashMap<>();
+    for (Map.Entry<String, String> entry : conf) {
+      confMap.put(entry.getKey(), entry.getValue());
+    }
+    return confMap;
+  }
+
+  private Map<Integer, List<ShuffleBlockInfo>> createTestData(
+      Roaring64NavigableMap[] bitmaps,
+      Map<Long, byte[]> expectedData) {
+    for (int i = 0; i < 4; i++) {
+      bitmaps[i] = Roaring64NavigableMap.bitmapOf();
+    }
+    List<ShuffleBlockInfo> blocks1 = ShuffleReadWriteBase.createShuffleBlockList(
+        0, 0, 0, 3, 25, bitmaps[0], expectedData, mockSSI);
+    List<ShuffleBlockInfo> blocks2 = ShuffleReadWriteBase.createShuffleBlockList(
+        0, 1, 1, 5, 25, bitmaps[1], expectedData, mockSSI);
+    List<ShuffleBlockInfo> blocks3 = ShuffleReadWriteBase.createShuffleBlockList(
+        0, 2, 2, 4, 25, bitmaps[2], expectedData, mockSSI);
+    List<ShuffleBlockInfo> blocks4 = ShuffleReadWriteBase.createShuffleBlockList(
+        0, 3, 3, 1, 25, bitmaps[3], expectedData, mockSSI);
+    Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
+    partitionToBlocks.put(0, blocks1);
+    partitionToBlocks.put(1, blocks2);
+    partitionToBlocks.put(2, blocks3);
+    partitionToBlocks.put(3, blocks4);
+    return partitionToBlocks;
+  }
+
+  @Test
+  public void hdfsWriteReadTest() throws Exception {
+    String alexDir = kerberizedHdfs.getSchemeAndAuthorityPrefix() + "/alex/";
+
+    String user = "alex";
+    String appId = "app_hdfs_read_write";
+    String dataBasePath = alexDir + "rss/test";
+
+    RemoteStorageInfo remoteStorageInfo = new RemoteStorageInfo(
+        dataBasePath,
+        conf2Map(kerberizedHdfs.getConf())
+    );
+
+    RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest(
+        appId,
+        0,
+        Lists.newArrayList(new PartitionRange(0, 1)),
+        remoteStorageInfo,
+        user
+    );
+    shuffleServerClient.registerShuffle(rrsr);
+
+    rrsr = new RssRegisterShuffleRequest(
+        appId,
+        0,
+        Lists.newArrayList(new PartitionRange(2, 3)),
+        remoteStorageInfo,
+        user
+    );
+    shuffleServerClient.registerShuffle(rrsr);
+
+    Roaring64NavigableMap[] bitmaps = new Roaring64NavigableMap[4];
+    Map<Long, byte[]> expectedData = Maps.newHashMap();
+    Map<Integer, List<ShuffleBlockInfo>> dataBlocks = createTestData(bitmaps, expectedData);
+    Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
+    partitionToBlocks.put(0, dataBlocks.get(0));
+    partitionToBlocks.put(1, dataBlocks.get(1));
+
+    Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
+    shuffleToBlocks.put(0, partitionToBlocks);
+
+    RssSendShuffleDataRequest rssdr = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
+    shuffleServerClient.sendShuffleData(rssdr);
+    assertEquals(456, shuffleServer.getShuffleBufferManager().getUsedMemory());
+    assertEquals(0, shuffleServer.getShuffleBufferManager().getPreAllocatedSize());
+    RssSendCommitRequest rscr = new RssSendCommitRequest(appId, 0);
+    shuffleServerClient.sendCommit(rscr);
+    RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(appId, 0);
+
+    ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(
+        StorageType.HDFS.name(),
+        appId,
+        0,
+        0,
+        100,
+        2,
+        10,
+        1000,
+        dataBasePath,
+        bitmaps[0],
+        Roaring64NavigableMap.bitmapOf(0),
+        Lists.newArrayList(),
+        new Configuration(),
+        new DefaultIdHelper()
+    );
+    assertNull(readClient.readShuffleBlockData());
+    shuffleServerClient.finishShuffle(rfsr);
+
+    partitionToBlocks.clear();
+    partitionToBlocks.put(2, dataBlocks.get(2));
+    shuffleToBlocks.clear();
+    shuffleToBlocks.put(0, partitionToBlocks);
+    rssdr = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
+    shuffleServerClient.sendShuffleData(rssdr);
+    assertEquals(0, shuffleServer.getShuffleBufferManager().getPreAllocatedSize());
+    rscr = new RssSendCommitRequest(appId, 0);
+    shuffleServerClient.sendCommit(rscr);
+    rfsr = new RssFinishShuffleRequest(appId, 0);
+    shuffleServerClient.finishShuffle(rfsr);
+
+    partitionToBlocks.clear();
+    partitionToBlocks.put(3, dataBlocks.get(3));
+    shuffleToBlocks.clear();
+    shuffleToBlocks.put(0, partitionToBlocks);
+    rssdr = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
+    shuffleServerClient.sendShuffleData(rssdr);
+    rscr = new RssSendCommitRequest(appId, 0);
+    shuffleServerClient.sendCommit(rscr);
+    rfsr = new RssFinishShuffleRequest(appId, 0);
+    shuffleServerClient.finishShuffle(rfsr);
+
+    readClient = new ShuffleReadClientImpl(
+        StorageType.HDFS.name(),
+        appId,
+        0,
+        0,
+        100,
+        2,
+        10,
+        1000,
+        dataBasePath, bitmaps[0],
+        Roaring64NavigableMap.bitmapOf(0),
+        Lists.newArrayList(),
+        new Configuration(),
+        new DefaultIdHelper()
+    );
+    validateResult(readClient, expectedData, bitmaps[0]);
+
+    readClient = new ShuffleReadClientImpl(
+        StorageType.HDFS.name(),
+        appId,
+        0,
+        1,
+        100,
+        2,
+        10,
+        1000,
+        dataBasePath,
+        bitmaps[1],
+        Roaring64NavigableMap.bitmapOf(1),
+        Lists.newArrayList(),
+        new Configuration(),
+        new DefaultIdHelper()
+    );
+    validateResult(readClient, expectedData, bitmaps[1]);
+
+    readClient = new ShuffleReadClientImpl(
+        StorageType.HDFS.name(),
+        appId,
+        0,
+        2,
+        100,
+        2,
+        10,
+        1000,
+        dataBasePath,
+        bitmaps[2],
+        Roaring64NavigableMap.bitmapOf(2),
+        Lists.newArrayList(),
+        new Configuration(),
+        new DefaultIdHelper()
+    );
+    validateResult(readClient, expectedData, bitmaps[2]);
+
+    readClient = new ShuffleReadClientImpl(
+        StorageType.HDFS.name(),
+        appId,
+        0,
+        3,
+        100,
+        2,
+        10,
+        1000,
+        dataBasePath,
+        bitmaps[3],
+        Roaring64NavigableMap.bitmapOf(3),
+        Lists.newArrayList(),
+        new Configuration(),
+        new DefaultIdHelper()
+    );
+    validateResult(readClient, expectedData, bitmaps[3]);
+  }
+
+  protected void validateResult(ShuffleReadClientImpl readClient, Map<Long, byte[]> expectedData,
+      Roaring64NavigableMap blockIdBitmap) {
+    CompressedShuffleBlock csb = readClient.readShuffleBlockData();
+    Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
+    while (csb != null && csb.getByteBuffer() != null) {
+      for (Map.Entry<Long, byte[]> entry : expectedData.entrySet()) {
+        if (TestUtils.compareByte(entry.getValue(), csb.getByteBuffer())) {
+          matched.addLong(entry.getKey());
+          break;
+        }
+      }
+      csb = readClient.readShuffleBlockData();
+    }
+    assertTrue(blockIdBitmap.equals(matched));
+  }
+}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 2852247b..bd27b663 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -124,11 +124,13 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
       String appId,
       int shuffleId,
       List<PartitionRange> partitionRanges,
-      RemoteStorageInfo remoteStorageInfo) {
+      RemoteStorageInfo remoteStorageInfo,
+      String user) {
     ShuffleRegisterRequest.Builder reqBuilder = ShuffleRegisterRequest.newBuilder();
     reqBuilder
         .setAppId(appId)
         .setShuffleId(shuffleId)
+        .setUser(user)
         .addAllPartitionRanges(toShufflePartitionRanges(partitionRanges));
     RemoteStorage.Builder rsBuilder = RemoteStorage.newBuilder();
     rsBuilder.setPath(remoteStorageInfo.getPath());
@@ -207,7 +209,9 @@ public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServer
         request.getAppId(),
         request.getShuffleId(),
         request.getPartitionRanges(),
-        request.getRemoteStorageInfo());
+        request.getRemoteStorageInfo(),
+        request.getUser()
+    );
 
     RssRegisterShuffleResponse response;
     StatusCode statusCode = rpcResponse.getStatus();
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java
index 6ff7f098..249d7136 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java
@@ -19,6 +19,8 @@ package org.apache.uniffle.client.request;
 
 import java.util.List;
 
+import org.apache.commons.lang3.StringUtils;
+
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 
@@ -28,16 +30,19 @@ public class RssRegisterShuffleRequest {
   private int shuffleId;
   private List<PartitionRange> partitionRanges;
   private RemoteStorageInfo remoteStorageInfo;
+  private String user;
 
   public RssRegisterShuffleRequest(
       String appId,
       int shuffleId,
       List<PartitionRange> partitionRanges,
-      RemoteStorageInfo remoteStorageInfo) {
+      RemoteStorageInfo remoteStorageInfo,
+      String user) {
     this.appId = appId;
     this.shuffleId = shuffleId;
     this.partitionRanges = partitionRanges;
     this.remoteStorageInfo = remoteStorageInfo;
+    this.user = user;
   }
 
   public RssRegisterShuffleRequest(
@@ -45,7 +50,7 @@ public class RssRegisterShuffleRequest {
       int shuffleId,
       List<PartitionRange> partitionRanges,
       String remoteStoragePath) {
-    this(appId, shuffleId, partitionRanges, new RemoteStorageInfo(remoteStoragePath));
+    this(appId, shuffleId, partitionRanges, new RemoteStorageInfo(remoteStoragePath), StringUtils.EMPTY);
   }
 
   public String getAppId() {
@@ -63,4 +68,8 @@ public class RssRegisterShuffleRequest {
   public RemoteStorageInfo getRemoteStorageInfo() {
     return remoteStorageInfo;
   }
+
+  public String getUser() {
+    return user;
+  }
 }
diff --git a/pom.xml b/pom.xml
index 05eb7d1d..415853b5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -336,6 +336,12 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-minikdc</artifactId>
+        <version>${hadoop.version}</version>
+        <scope>test</scope>
+      </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-client</artifactId>
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index d4d979c4..a9032ca2 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -151,6 +151,7 @@ message ShuffleRegisterRequest {
   int32 shuffleId = 2;
   repeated ShufflePartitionRange partitionRanges = 3;
   RemoteStorage remoteStorage = 4;
+  string user = 5;
 }
 
 message ShuffleRegisterResponse {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 996018af..aa67d036 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.RangeMap;
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
@@ -158,6 +159,10 @@ public class ShuffleFlushManager {
           writeSuccess = true;
           LOG.warn("AppId {} was removed already, event {} should be dropped", event.getAppId(), event);
         } else {
+          String user = StringUtils.defaultString(
+              shuffleServer.getShuffleTaskManager().getUserByAppId(event.getAppId()),
+              StringUtils.EMPTY
+          );
           ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(new CreateShuffleWriteHandlerRequest(
               storageType,
               event.getAppId(),
@@ -167,8 +172,9 @@ public class ShuffleFlushManager {
               storageBasePaths.toArray(new String[storageBasePaths.size()]),
               shuffleServerId,
               hadoopConf,
-              storageDataReplica));
-
+              storageDataReplica,
+              user)
+          );
           do {
             if (event.getRetryTimes() > retryMax) {
               LOG.error("Failed to write data for " + event + " in " + retryMax + " times, shuffle data will be lost");
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 4194bb0a..262673d0 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -35,6 +35,8 @@ import org.apache.uniffle.common.Arguments;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.metrics.JvmMetrics;
 import org.apache.uniffle.common.rpc.ServerInterface;
+import org.apache.uniffle.common.security.SecurityConfig;
+import org.apache.uniffle.common.security.SecurityContextFactory;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.web.CommonMetricsServlet;
@@ -43,6 +45,11 @@ import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.server.storage.StorageManagerFactory;
 
+import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE;
+import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE;
+import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL;
+import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_RELOGIN_INTERVAL_SEC;
+
 /**
  * Server that manages startup/shutdown of a {@code Greeter} server.
  */
@@ -129,6 +136,7 @@ public class ShuffleServer {
       healthCheck.stop();
       LOG.info("HealthCheck stopped!");
     }
+    SecurityContextFactory.get().getSecurityContext().close();
     server.stop();
     LOG.info("RPC Server Stopped!");
   }
@@ -144,10 +152,19 @@ public class ShuffleServer {
     jettyServer = new JettyServer(shuffleServerConf);
     registerMetrics();
 
+    SecurityConfig securityConfig = null;
+    if (shuffleServerConf.getBoolean(RSS_SECURITY_HADOOP_KERBEROS_ENABLE)) {
+      securityConfig = securityConfig.newBuilder()
+          .keytabFilePath(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE))
+          .principal(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL))
+          .reloginIntervalSec(shuffleServerConf.getLong(RSS_SECURITY_HADOOP_KERBEROS_RELOGIN_INTERVAL_SEC))
+          .build();
+    }
+    SecurityContextFactory.get().init(securityConfig);
+
     storageManager = StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
     storageManager.start();
 
-
     boolean healthCheckEnable = shuffleServerConf.getBoolean(ShuffleServerConf.HEALTH_CHECK_ENABLE);
     if (healthCheckEnable) {
       List<Checker> buildInCheckers = Lists.newArrayList();
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 8396cee4..fb17c36c 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -109,6 +109,8 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase {
     String appId = req.getAppId();
     int shuffleId = req.getShuffleId();
     String remoteStoragePath = req.getRemoteStorage().getPath();
+    String user = req.getUser();
+
     Map<String, String> remoteStorageConf = req
         .getRemoteStorage()
         .getRemoteStorageConfList()
@@ -118,12 +120,17 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase {
     List<PartitionRange> partitionRanges = toPartitionRanges(req.getPartitionRangesList());
     LOG.info("Get register request for appId[" + appId + "], shuffleId[" + shuffleId
         + "], remoteStorage[" + remoteStoragePath + "] with "
-        + partitionRanges.size() + " partition ranges");
+        + partitionRanges.size() + " partition ranges. User: {}", user);
 
     StatusCode result = shuffleServer
         .getShuffleTaskManager()
         .registerShuffle(
-            appId, shuffleId, partitionRanges, new RemoteStorageInfo(remoteStoragePath, remoteStorageConf));
+            appId,
+            shuffleId,
+            partitionRanges,
+            new RemoteStorageInfo(remoteStoragePath, remoteStorageConf),
+            user
+        );
 
     reply = ShuffleRegisterResponse
         .newBuilder()
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 655bf5be..5fc6ca84 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -78,6 +78,8 @@ public class ShuffleTaskManager {
   private Map<Long, PreAllocatedBufferInfo> requireBufferIds = Maps.newConcurrentMap();
   private Runnable clearResourceThread;
   private BlockingQueue<String> expiredAppIdQueue = Queues.newLinkedBlockingQueue();
+  // appId -> user
+  private Map<String, String> appUserMap = Maps.newConcurrentMap();
   // appId -> shuffleId -> serverReadHandler
 
   public ShuffleTaskManager(
@@ -125,8 +127,10 @@ public class ShuffleTaskManager {
       String appId,
       int shuffleId,
       List<PartitionRange> partitionRanges,
-      RemoteStorageInfo remoteStorageInfo) {
+      RemoteStorageInfo remoteStorageInfo,
+      String user) {
     refreshAppId(appId);
+    appUserMap.putIfAbsent(appId, user);
     partitionsToBlockIds.putIfAbsent(appId, Maps.newConcurrentMap());
     for (PartitionRange partitionRange : partitionRanges) {
       shuffleBufferManager.registerBuffer(appId, shuffleId, partitionRange.getStart(), partitionRange.getEnd());
@@ -379,6 +383,7 @@ public class ShuffleTaskManager {
     partitionsToBlockIds.remove(appId);
     shuffleBufferManager.removeBuffer(appId);
     shuffleFlushManager.removeResources(appId);
+    appUserMap.remove(appId);
     if (!shuffleToCachedBlockIds.isEmpty()) {
       storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet());
     }
@@ -418,6 +423,10 @@ public class ShuffleTaskManager {
     return pabi.getRequireSize();
   }
 
+  public String getUserByAppId(String appId) {
+    return appUserMap.get(appId);
+  }
+
   @VisibleForTesting
   public Set<String> getAppIds() {
     return shuffleTaskInfos.keySet();
diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index bc3ec399..6debf6e7 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -88,7 +88,7 @@ public class ShuffleBufferManager {
       bufferRangeMap.put(Range.closed(startPartition, endPartition), new ShuffleBuffer(bufferSize));
     } else {
       LOG.warn("Already register for appId[" + appId + "], shuffleId[" + shuffleId + "], startPartition["
-          + startPartition + "], endPartition[" + endPartition + "]");
+              + startPartition + "], endPartition[" + endPartition + "]");
     }
 
     return StatusCode.SUCCESS;
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 70022745..67836def 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -32,13 +32,13 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import io.prometheus.client.Gauge;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -50,6 +50,7 @@ import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.server.storage.HdfsStorageManager;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.server.storage.StorageManagerFactory;
@@ -62,6 +63,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ShuffleFlushManagerTest extends HdfsTestBase {
 
@@ -71,6 +74,17 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
   private ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
   private RemoteStorageInfo remoteStorage = new RemoteStorageInfo(HDFS_URI + "rss/test", Maps.newHashMap());
 
+  private static ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    ShuffleTaskManager shuffleTaskManager = mock(ShuffleTaskManager.class);
+    ShuffleBufferManager shuffleBufferManager = mock(ShuffleBufferManager.class);
+
+    when(mockShuffleServer.getShuffleTaskManager()).thenReturn(shuffleTaskManager);
+    when(mockShuffleServer.getShuffleBufferManager()).thenReturn(shuffleBufferManager);
+  }
+
   @BeforeEach
   public void prepare() {
     ShuffleServerMetrics.register();
@@ -91,7 +105,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     StorageManager storageManager =
         StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
     ShuffleFlushManager manager =
-        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", null, storageManager);
+        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", mockShuffleServer, storageManager);
     assertEquals("2", manager.getHadoopConf().get("dfs.replication"));
     assertEquals("value", manager.getHadoopConf().get("a.b"));
   }
@@ -102,14 +116,13 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     StorageManager storageManager =
         StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
     storageManager.registerRemoteStorage(appId, remoteStorage);
-    storageManager.registerRemoteStorage(appId, remoteStorage);
     String storageHost = "localhost";
     assertEquals(0.0, ShuffleServerMetrics.counterRemoteStorageTotalWrite.get(storageHost).get(), 0.5);
     assertEquals(0.0, ShuffleServerMetrics.counterRemoteStorageRetryWrite.get(storageHost).get(), 0.5);
     assertEquals(0.0, ShuffleServerMetrics.counterRemoteStorageFailedWrite.get(storageHost).get(), 0.5);
     assertEquals(0.0, ShuffleServerMetrics.counterRemoteStorageSuccessWrite.get(storageHost).get(), 0.5);
     ShuffleFlushManager manager =
-        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", null, storageManager);
+        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", mockShuffleServer, storageManager);
     ShuffleDataFlushEvent event1 =
         createShuffleDataFlushEvent(appId, 1, 1, 1, null);
     List<ShufflePartitionedBlock> blocks1 = event1.getShuffleBlocks();
@@ -154,7 +167,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     List<ShufflePartitionedBlock> expectedBlocks = Lists.newArrayList();
     List<ShuffleDataFlushEvent> flushEvents1 = Lists.newArrayList();
     List<ShuffleDataFlushEvent> flushEvents2 = Lists.newArrayList();
-    ShuffleFlushManager manager = new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", null, storageManager);
+    ShuffleFlushManager manager = new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", mockShuffleServer, storageManager);
     for (int i = 0; i < 30; i++) {
       ShuffleDataFlushEvent flushEvent1 = createShuffleDataFlushEvent(appId, 1, 1, 1, null);
       ShuffleDataFlushEvent flushEvent2 = createShuffleDataFlushEvent(appId, 1, 1, 1, null);
@@ -192,7 +205,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     storageManager.registerRemoteStorage(appId1, remoteStorage);
     storageManager.registerRemoteStorage(appId2, remoteStorage);
     ShuffleFlushManager manager =
-        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", null, storageManager);
+        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", mockShuffleServer, storageManager);
     ShuffleDataFlushEvent event1 =
         createShuffleDataFlushEvent(appId1, 1, 0, 1, null);
     manager.addToFlushQueue(event1);
@@ -245,7 +258,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     StorageManager storageManager =
         StorageManagerFactory.getInstance().createStorageManager(serverConf);
     ShuffleFlushManager manager =
-        new ShuffleFlushManager(serverConf, "shuffleServerId", null, storageManager);
+        new ShuffleFlushManager(serverConf, "shuffleServerId", mockShuffleServer, storageManager);
     ShuffleDataFlushEvent event1 =
         createShuffleDataFlushEvent(appId1, 1, 0, 1, null);
     manager.addToFlushQueue(event1);
@@ -390,7 +403,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
       StorageManager storageManager =
           StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
       ShuffleFlushManager manager =
-          new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", null, storageManager);
+          new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", mockShuffleServer, storageManager);
       ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(1, "1", 1, 1,1, 100, null, null, null);
       assertEquals(0, manager.getPendingEventsSize());
       manager.addPendingEvents(event);
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index a8d57bf8..56cab2ca 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Future;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -61,7 +62,11 @@ public class ShuffleServerMetricsTest {
     ssc.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "fake.coordinator:123");
     ssc.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L);
     shuffleServer = new ShuffleServer(ssc);
-    shuffleServer.getStorageManager().registerRemoteStorage("metricsTest", new RemoteStorageInfo(REMOTE_STORAGE_PATH));
+    shuffleServer.getStorageManager()
+        .registerRemoteStorage(
+            "metricsTest",
+            new RemoteStorageInfo(REMOTE_STORAGE_PATH)
+        );
     shuffleServer.start();
   }
 
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 13b2e509..690ad1d8 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import com.google.common.collect.Lists;
 import com.google.common.collect.RangeMap;
 import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Test;
@@ -80,9 +81,19 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     int shuffleId = 1;
 
     shuffleTaskManager.registerShuffle(
-        appId, shuffleId, Lists.newArrayList(new PartitionRange(0, 1)), RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
+        appId,
+        shuffleId,
+        Lists.newArrayList(new PartitionRange(0, 1)),
+        RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+        StringUtils.EMPTY
+    );
     shuffleTaskManager.registerShuffle(
-        appId, shuffleId, Lists.newArrayList(new PartitionRange(2, 3)), RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
+        appId,
+        shuffleId,
+        Lists.newArrayList(new PartitionRange(2, 3)),
+        RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+        StringUtils.EMPTY
+    );
 
     Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> bufferPool =
         shuffleServer.getShuffleBufferManager().getBufferPool();
@@ -95,7 +106,12 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
 
     // register again
     shuffleTaskManager.registerShuffle(
-        appId, shuffleId, Lists.newArrayList(new PartitionRange(0, 1)), RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
+        appId,
+        shuffleId,
+        Lists.newArrayList(new PartitionRange(0, 1)),
+        RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+        StringUtils.EMPTY
+    );
     assertEquals(buffer, bufferPool.get(appId).get(shuffleId).get(0));
   }
 
@@ -120,9 +136,19 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(
         conf, shuffleFlushManager, shuffleBufferManager, storageManager);
     shuffleTaskManager.registerShuffle(
-        appId, shuffleId, Lists.newArrayList(new PartitionRange(1, 1)), new RemoteStorageInfo(remoteStorage));
+        appId,
+        shuffleId,
+        Lists.newArrayList(new PartitionRange(1, 1)),
+        new RemoteStorageInfo(remoteStorage),
+        StringUtils.EMPTY
+    );
     shuffleTaskManager.registerShuffle(
-        appId, shuffleId, Lists.newArrayList(new PartitionRange(2, 2)), new RemoteStorageInfo(remoteStorage));
+        appId,
+        shuffleId,
+        Lists.newArrayList(new PartitionRange(2, 2)),
+        new RemoteStorageInfo(remoteStorage),
+        StringUtils.EMPTY
+    );
     List<ShufflePartitionedBlock> expectedBlocks1 = Lists.newArrayList();
     List<ShufflePartitionedBlock> expectedBlocks2 = Lists.newArrayList();
     Map<Long, PreAllocatedBufferInfo> bufferIds = shuffleTaskManager.getRequireBufferIds();
@@ -247,11 +273,19 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     StorageManager storageManager = shuffleServer.getStorageManager();
     ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, shuffleFlushManager, shuffleBufferManager, storageManager);
     shuffleTaskManager.registerShuffle(
-        "clearTest1", shuffleId,
-        Lists.newArrayList(new PartitionRange(0, 1)), RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
+        "clearTest1",
+        shuffleId,
+        Lists.newArrayList(new PartitionRange(0, 1)),
+        RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+        StringUtils.EMPTY
+    );
     shuffleTaskManager.registerShuffle(
-        "clearTest2", shuffleId,
-        Lists.newArrayList(new PartitionRange(0, 1)), RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
+        "clearTest2",
+        shuffleId,
+        Lists.newArrayList(new PartitionRange(0, 1)),
+        RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+        StringUtils.EMPTY
+    );
     shuffleTaskManager.refreshAppId("clearTest1");
     shuffleTaskManager.refreshAppId("clearTest2");
     assertEquals(2, shuffleTaskManager.getAppIds().size());
@@ -273,8 +307,12 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
 
     // register again
     shuffleTaskManager.registerShuffle(
-        "clearTest2", shuffleId,
-        Lists.newArrayList(new PartitionRange(0, 1)), RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
+        "clearTest2",
+        shuffleId,
+        Lists.newArrayList(new PartitionRange(0, 1)),
+        RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+        StringUtils.EMPTY
+    );
     shuffleTaskManager.refreshAppId("clearTest2");
     shuffleTaskManager.checkResourceStatus();
     assertEquals(Sets.newHashSet("clearTest1", "clearTest2"), shuffleTaskManager.getAppIds());
diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index a7a00789..0d43de64 100644
--- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -34,6 +34,7 @@ import org.apache.uniffle.server.ShuffleFlushManager;
 import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.ShuffleTaskManager;
 import org.apache.uniffle.server.StatusCode;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.server.storage.StorageManagerFactory;
@@ -341,6 +342,9 @@ public class ShuffleBufferManagerTest extends BufferTestBase {
     when(mockShuffleServer
         .getShuffleBufferManager())
         .thenReturn(shuffleBufferManager);
+    when(mockShuffleServer
+        .getShuffleTaskManager())
+        .thenReturn(mock(ShuffleTaskManager.class));
 
     String appId = "bufferSizeTest";
     int shuffleId = 1;
diff --git a/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java
index 1ad7967f..b1b0cd98 100644
--- a/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java
@@ -62,13 +62,16 @@ public class HdfsStorageManagerTest {
     final String remoteStoragePath3 = "hdfs://path3";
     hdfsStorageManager.registerRemoteStorage(
         "app1",
-        new RemoteStorageInfo(remoteStoragePath1, ImmutableMap.of("k1", "v1", "k2", "v2")));
+        new RemoteStorageInfo(remoteStoragePath1, ImmutableMap.of("k1", "v1", "k2", "v2"))
+    );
     hdfsStorageManager.registerRemoteStorage(
         "app2",
-        new RemoteStorageInfo(remoteStoragePath2, ImmutableMap.of("k3", "v3")));
+        new RemoteStorageInfo(remoteStoragePath2, ImmutableMap.of("k3", "v3"))
+    );
     hdfsStorageManager.registerRemoteStorage(
         "app3",
-        new RemoteStorageInfo(remoteStoragePath3, Maps.newHashMap()));
+        new RemoteStorageInfo(remoteStoragePath3, Maps.newHashMap())
+    );
     Map<String, HdfsStorage> appStorageMap =  hdfsStorageManager.getAppIdToStorages();
     assertEquals(3, appStorageMap.size());
     assertEquals(Sets.newHashSet("app1", "app2", "app3"), appStorageMap.keySet());
diff --git a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
index 3c651594..04ef0729 100644
--- a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
diff --git a/storage/pom.xml b/storage/pom.xml
index fc741e3d..1e129a04 100644
--- a/storage/pom.xml
+++ b/storage/pom.xml
@@ -52,6 +52,24 @@
       <artifactId>mockito-inline</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.uniffle</groupId>
+      <artifactId>rss-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
index eb0a24ee..55d0898b 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/HdfsStorage.java
@@ -98,6 +98,7 @@ public class HdfsStorage extends AbstractStorage {
   @Override
   ShuffleWriteHandler newWriteHandler(CreateShuffleWriteHandlerRequest request) {
     try {
+      String user = request.getUser();
       return new HdfsShuffleWriteHandler(
           request.getAppId(),
           request.getShuffleId(),
@@ -105,7 +106,8 @@ public class HdfsStorage extends AbstractStorage {
           request.getEndPartition(),
           storagePath,
           request.getFileNamePrefix(),
-          conf
+          conf,
+          user
       );
     } catch (Exception e) {
       throw new RuntimeException(e);
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
index f0e74911..872672cc 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
@@ -17,7 +17,6 @@
 
 package org.apache.uniffle.storage.handler.impl;
 
-import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
 
@@ -32,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
@@ -82,8 +82,8 @@ public class HdfsClientReadHandler extends AbstractClientReadHandler {
     FileSystem fs;
     Path baseFolder = new Path(fullShufflePath);
     try {
-      fs = ShuffleStorageUtils.getFileSystemForPath(baseFolder, hadoopConf);
-    } catch (IOException ioe) {
+      fs = HadoopFilesystemProvider.getFilesystem(baseFolder, hadoopConf);
+    } catch (Exception ioe) {
       throw new RuntimeException("Can't get FileSystem for " + baseFolder);
     }
 
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
index feff6dd7..a6414114 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileReader.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 import org.apache.uniffle.storage.api.FileReader;
-import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
 public class HdfsFileReader implements FileReader, Closeable {
 
@@ -38,14 +38,14 @@ public class HdfsFileReader implements FileReader, Closeable {
   private Configuration hadoopConf;
   private FSDataInputStream fsDataInputStream;
 
-  public HdfsFileReader(Path path, Configuration hadoopConf) throws IOException, IllegalStateException {
+  public HdfsFileReader(Path path, Configuration hadoopConf) throws Exception {
     this.path = path;
     this.hadoopConf = hadoopConf;
     createStream();
   }
 
-  private void createStream() throws IOException, IllegalStateException {
-    FileSystem fileSystem = ShuffleStorageUtils.getFileSystemForPath(path, hadoopConf);
+  private void createStream() throws Exception {
+    FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(path, hadoopConf);
 
     if (!fileSystem.isFile(path)) {
       String msg = path + " don't exist or is not a file.";
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
index 955bd5f8..92c04208 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
@@ -39,35 +39,37 @@ public class HdfsFileWriter implements Closeable {
 
   private static final Logger LOG = LoggerFactory.getLogger(HdfsFileWriter.class);
 
+  private final FileSystem fileSystem;
+
   private Path path;
   private Configuration hadoopConf;
   private FSDataOutputStream fsDataOutputStream;
   private long nextOffset;
 
-  public HdfsFileWriter(Path path, Configuration hadoopConf) throws IOException, IllegalStateException {
-    // init fsDataOutputStream
+  public HdfsFileWriter(FileSystem fileSystem, Path path, Configuration hadoopConf) throws IOException {
     this.path = path;
     this.hadoopConf = hadoopConf;
+    this.fileSystem = fileSystem;
     initStream();
   }
 
   private void initStream() throws IOException, IllegalStateException {
-    FileSystem fileSystem = ShuffleStorageUtils.getFileSystemForPath(path, hadoopConf);
-    if (fileSystem.isFile(path)) {
+    final FileSystem writerFs = fileSystem;
+    if (writerFs.isFile(path)) {
       if (hadoopConf.getBoolean("dfs.support.append", true)) {
-        fsDataOutputStream = fileSystem.append(path);
+        fsDataOutputStream = writerFs.append(path);
         nextOffset = fsDataOutputStream.getPos();
       } else {
         String msg = path + " exists but append mode is not support!";
         LOG.error(msg);
         throw new IllegalStateException(msg);
       }
-    } else if (fileSystem.isDirectory(path)) {
+    } else if (writerFs.isDirectory(path)) {
       String msg = path + " is a directory!";
       LOG.error(msg);
       throw new IllegalStateException(msg);
     } else {
-      fsDataOutputStream = fileSystem.create(path);
+      fsDataOutputStream = writerFs.create(path);
       nextOffset = fsDataOutputStream.getPos();
     }
   }
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
index 17480256..22f4f0ca 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
@@ -46,7 +47,7 @@ public class HdfsShuffleDeleteHandler implements ShuffleDeleteHandler {
     LOG.info("Try delete shuffle data in HDFS for appId[" + appId + "] with " + path);
     while (!isSuccess && times < retryMax) {
       try {
-        FileSystem fileSystem = ShuffleStorageUtils.getFileSystemForPath(path, hadoopConf);
+        FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(path, hadoopConf);
         fileSystem.delete(path, true);
         isSuccess = true;
       } catch (Exception e) {
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
index de542416..011ac0f2 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
@@ -50,7 +50,7 @@ public class HdfsShuffleReadHandler extends DataSkippableReadHandler {
       int readBufferSize,
       Roaring64NavigableMap expectBlockIds,
       Roaring64NavigableMap processBlockIds,
-      Configuration conf) throws IOException {
+      Configuration conf) throws Exception {
     super(appId, shuffleId, partitionId, readBufferSize, expectBlockIds, processBlockIds);
     this.filePrefix = filePrefix;
     this.indexReader = createHdfsReader(ShuffleStorageUtils.generateIndexFileName(filePrefix), conf);
@@ -123,7 +123,7 @@ public class HdfsShuffleReadHandler extends DataSkippableReadHandler {
   }
 
   protected HdfsFileReader createHdfsReader(
-      String fileName, Configuration hadoopConf) throws IOException, IllegalStateException {
+      String fileName, Configuration hadoopConf) throws Exception {
     Path path = new Path(fileName);
     return new HdfsFileReader(path, hadoopConf);
   }
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
index f7964133..f6b89b59 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleWriteHandler.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -43,7 +44,11 @@ public class HdfsShuffleWriteHandler implements ShuffleWriteHandler {
   private String fileNamePrefix;
   private Lock writeLock = new ReentrantLock();
   private int failTimes = 0;
+  private String user;
+  private FileSystem fileSystem;
 
+  // Only for test cases when using non-kerberized dfs cluster.
+  @VisibleForTesting
   public HdfsShuffleWriteHandler(
       String appId,
       int shuffleId,
@@ -51,7 +56,7 @@ public class HdfsShuffleWriteHandler implements ShuffleWriteHandler {
       int endPartition,
       String storageBasePath,
       String fileNamePrefix,
-      Configuration hadoopConf) throws IOException, IllegalStateException {
+      Configuration hadoopConf) throws Exception {
     this.hadoopConf = hadoopConf;
     this.fileNamePrefix = fileNamePrefix;
     this.basePath = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath,
@@ -59,9 +64,27 @@ public class HdfsShuffleWriteHandler implements ShuffleWriteHandler {
     initialize();
   }
 
-  private void initialize() throws IOException, IllegalStateException {
+  public HdfsShuffleWriteHandler(
+      String appId,
+      int shuffleId,
+      int startPartition,
+      int endPartition,
+      String storageBasePath,
+      String fileNamePrefix,
+      Configuration hadoopConf,
+      String user) throws Exception {
+    this.hadoopConf = hadoopConf;
+    this.fileNamePrefix = fileNamePrefix;
+    this.basePath = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath,
+            ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId, startPartition, endPartition));
+    this.user = user;
+    initialize();
+  }
+
+  private void initialize() throws Exception {
     Path path = new Path(basePath);
-    FileSystem fileSystem = ShuffleStorageUtils.getFileSystemForPath(path, hadoopConf);
+    LOG.info("User: {}, Path: {}", user, path);
+    this.fileSystem = HadoopFilesystemProvider.getFilesystem(user, path, hadoopConf);
     // check if shuffle folder exist
     if (!fileSystem.exists(path)) {
       try {
@@ -131,7 +154,7 @@ public class HdfsShuffleWriteHandler implements ShuffleWriteHandler {
 
   private HdfsFileWriter createWriter(String fileName) throws IOException, IllegalStateException {
     Path path = new Path(basePath, fileName);
-    HdfsFileWriter writer = new HdfsFileWriter(path, hadoopConf);
+    HdfsFileWriter writer = new HdfsFileWriter(fileSystem, path, hadoopConf);
     return writer;
   }
 
diff --git a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
index e311d974..2328f7a6 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleWriteHandlerRequest.java
@@ -30,6 +30,7 @@ public class CreateShuffleWriteHandlerRequest {
   private String fileNamePrefix;
   private Configuration conf;
   private int storageDataReplica;
+  private String user;
 
   public CreateShuffleWriteHandlerRequest(
       String storageType,
@@ -40,7 +41,8 @@ public class CreateShuffleWriteHandlerRequest {
       String[] storageBasePaths,
       String fileNamePrefix,
       Configuration conf,
-      int storageDataReplica) {
+      int storageDataReplica,
+      String user) {
     this.storageType = storageType;
     this.appId = appId;
     this.shuffleId = shuffleId;
@@ -50,6 +52,7 @@ public class CreateShuffleWriteHandlerRequest {
     this.fileNamePrefix = fileNamePrefix;
     this.conf = conf;
     this.storageDataReplica = storageDataReplica;
+    this.user = user;
   }
 
   public String getStorageType() {
@@ -87,4 +90,12 @@ public class CreateShuffleWriteHandlerRequest {
   public int getStorageDataReplica() {
     return storageDataReplica;
   }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
 }
diff --git a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
index 16774b9e..4d0cf17b 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
@@ -26,9 +26,7 @@ import java.util.List;
 
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.hash.MurmurHash;
 import org.slf4j.Logger;
@@ -49,22 +47,6 @@ public class ShuffleStorageUtils {
   private ShuffleStorageUtils() {
   }
 
-  public static FileSystem getFileSystemForPath(Path path, Configuration conf) throws IOException {
-    // For local file systems, return the raw local file system, such calls to flush()
-    // actually flushes the stream.
-    try {
-      FileSystem fs = path.getFileSystem(conf);
-      if (fs instanceof LocalFileSystem) {
-        LOG.debug("{} is local file system", path);
-        return ((LocalFileSystem) fs).getRawFileSystem();
-      }
-      return fs;
-    } catch (IOException e) {
-      LOG.error("Fail to get filesystem of {}", path);
-      throw e;
-    }
-  }
-
   public static String generateDataFileName(String fileNamePrefix) {
     return fileNamePrefix + Constants.SHUFFLE_DATA_FILE_SUFFIX;
   }
diff --git a/storage/src/test/java/org/apache/uniffle/storage/HdfsShuffleHandlerTestBase.java b/storage/src/test/java/org/apache/uniffle/storage/HdfsShuffleHandlerTestBase.java
index b1e36bb4..dd8f8859 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/HdfsShuffleHandlerTestBase.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/HdfsShuffleHandlerTestBase.java
@@ -40,11 +40,11 @@ import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class HdfsShuffleHandlerTestBase extends HdfsTestBase {
+public class HdfsShuffleHandlerTestBase {
 
   private static final AtomicLong ATOMIC_LONG = new AtomicLong(0);
 
-  protected void writeTestData(
+  public static void writeTestData(
       HdfsShuffleWriteHandler writeHandler,
       int num, int length, long taskAttemptId,
       Map<Long, byte[]> expectedData) throws Exception {
@@ -61,7 +61,7 @@ public class HdfsShuffleHandlerTestBase extends HdfsTestBase {
     writeHandler.write(blocks);
   }
 
-  protected void writeTestData(
+  public static void writeTestData(
       HdfsFileWriter writer,
       int partitionId,
       int num, int length, long taskAttemptId,
@@ -94,14 +94,14 @@ public class HdfsShuffleHandlerTestBase extends HdfsTestBase {
     expectedIndexSegments.put(partitionId, segments);
   }
 
-  protected byte[] writeData(HdfsFileWriter writer, int len) throws IOException {
+  public static byte[] writeData(HdfsFileWriter writer, int len) throws IOException {
     byte[] data = new byte[len];
     new Random().nextBytes(data);
     writer.writeData(data);
     return data;
   }
 
-  protected int calcExpectedSegmentNum(int num, int size, int bufferSize) {
+  public static int calcExpectedSegmentNum(int num, int size, int bufferSize) {
     int segmentNum = 0;
     int cur = 0;
     for (int i = 0; i < num; ++i) {
@@ -119,7 +119,7 @@ public class HdfsShuffleHandlerTestBase extends HdfsTestBase {
     return segmentNum;
   }
 
-  protected void checkData(ShuffleDataResult shuffleDataResult, Map<Long, byte[]> expectedData) {
+  public static void checkData(ShuffleDataResult shuffleDataResult, Map<Long, byte[]> expectedData) {
 
     byte[] buffer = shuffleDataResult.getData();
     List<BufferSegment> bufferSegments = shuffleDataResult.getBufferSegments();
@@ -132,8 +132,8 @@ public class HdfsShuffleHandlerTestBase extends HdfsTestBase {
     }
   }
 
-  protected HdfsFileReader createHdfsReader(
-      String folder, String fileName, Configuration hadoopConf) throws IOException, IllegalStateException {
+  public static HdfsFileReader createHdfsReader(
+      String folder, String fileName, Configuration hadoopConf) throws Exception {
     Path path = new Path(folder, fileName);
     HdfsFileReader reader = new HdfsFileReader(path, hadoopConf);
     return reader;
diff --git a/storage/src/test/java/org/apache/uniffle/storage/HdfsTestBase.java b/storage/src/test/java/org/apache/uniffle/storage/HdfsTestBase.java
index e1cf24ad..df8d26d2 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/HdfsTestBase.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/HdfsTestBase.java
@@ -18,7 +18,6 @@
 package org.apache.uniffle.storage;
 
 import java.io.File;
-import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -42,7 +41,7 @@ public class HdfsTestBase implements Serializable {
   protected static File baseDir;
 
   @BeforeAll
-  public static void setUpHdfs(@TempDir File tempDir) throws IOException {
+  public static void setUpHdfs(@TempDir File tempDir) throws Exception {
     conf = new Configuration();
     baseDir = tempDir;
     conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
@@ -53,7 +52,7 @@ public class HdfsTestBase implements Serializable {
   }
 
   @AfterAll
-  public static void tearDownHdfs() throws IOException {
+  public static void tearDownHdfs() throws Exception {
     fs.close();
     cluster.shutdown();
   }
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java
index df3e27e6..26c0175e 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandlerTest.java
@@ -23,23 +23,27 @@ import java.util.Set;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
-import org.apache.uniffle.storage.HdfsShuffleHandlerTestBase;
+import org.apache.uniffle.storage.HdfsTestBase;
 
+import static org.apache.uniffle.storage.HdfsShuffleHandlerTestBase.calcExpectedSegmentNum;
+import static org.apache.uniffle.storage.HdfsShuffleHandlerTestBase.checkData;
+import static org.apache.uniffle.storage.HdfsShuffleHandlerTestBase.writeTestData;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
-public class HdfsClientReadHandlerTest extends HdfsShuffleHandlerTestBase {
+public class HdfsClientReadHandlerTest extends HdfsTestBase {
 
-  @Test
-  public void test() {
+  public static void createAndRunCases(String clusterPathPrefix, Configuration hadoopConf, String writeUser) {
     try {
-      String basePath = HDFS_URI + "clientReadTest1";
+      String basePath = clusterPathPrefix + "clientReadTest1";
       HdfsShuffleWriteHandler writeHandler =
           new HdfsShuffleWriteHandler(
               "appId",
@@ -48,7 +52,8 @@ public class HdfsClientReadHandlerTest extends HdfsShuffleHandlerTestBase {
               1,
               basePath,
               "test",
-              conf);
+              hadoopConf,
+              writeUser);
 
       Map<Long, byte[]> expectedData = Maps.newHashMap();
       Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
@@ -78,7 +83,7 @@ public class HdfsClientReadHandlerTest extends HdfsShuffleHandlerTestBase {
           expectBlockIds,
           processBlockIds,
           basePath,
-          conf);
+          hadoopConf);
       Set<Long> actualBlockIds = Sets.newHashSet();
 
       for (int i = 0; i < total; ++i) {
@@ -106,4 +111,9 @@ public class HdfsClientReadHandlerTest extends HdfsShuffleHandlerTestBase {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void test() {
+    createAndRunCases(HDFS_URI, conf, StringUtils.EMPTY);
+  }
 }
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsFileReaderTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsFileReaderTest.java
index ec40d212..437ed639 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsFileReaderTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsFileReaderTest.java
@@ -36,7 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class HdfsFileReaderTest extends HdfsTestBase {
 
   @Test
-  public void createStreamTest() throws IOException {
+  public void createStreamTest() throws Exception {
     Path path = new Path(HDFS_URI, "createStreamTest");
     fs.create(path);
 
@@ -58,7 +58,7 @@ public class HdfsFileReaderTest extends HdfsTestBase {
   }
 
   @Test
-  public void readDataTest() throws IOException {
+  public void readDataTest() throws Exception {
     Path path = new Path(HDFS_URI, "readDataTest");
     byte[] data = new byte[160];
     int offset = 128;
@@ -66,7 +66,7 @@ public class HdfsFileReaderTest extends HdfsTestBase {
     new Random().nextBytes(data);
     long crc11 = ChecksumUtils.getCrc32(ByteBuffer.wrap(data, offset, length));
 
-    try (HdfsFileWriter writer = new HdfsFileWriter(path, conf)) {
+    try (HdfsFileWriter writer = new HdfsFileWriter(fs, path, conf)) {
       writer.writeData(data);
     }
     FileBasedShuffleSegment segment = new FileBasedShuffleSegment(23, offset, length, length, 0xdeadbeef, 1);
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriterTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriterTest.java
index 5af94416..f816698e 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriterTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriterTest.java
@@ -39,7 +39,7 @@ public class HdfsFileWriterTest extends HdfsTestBase {
   @Test
   public void createStreamFirstTest() throws IOException {
     Path path = new Path(HDFS_URI, "createStreamFirstTest");
-    try (HdfsFileWriter writer = new HdfsFileWriter(path, conf)) {
+    try (HdfsFileWriter writer = new HdfsFileWriter(fs, path, conf)) {
       assertTrue(fs.isFile(path));
       assertEquals(0, writer.nextOffset());
     }
@@ -52,14 +52,14 @@ public class HdfsFileWriterTest extends HdfsTestBase {
 
     // create a file and fill 32 bytes
     Path path = new Path(HDFS_URI, "createStreamAppendTest");
-    try (HdfsFileWriter writer = new HdfsFileWriter(path, conf)) {
+    try (HdfsFileWriter writer = new HdfsFileWriter(fs, path, conf)) {
       assertEquals(0, writer.nextOffset());
       writer.writeData(data);
       assertEquals(32, writer.nextOffset());
     }
 
     // open existing file using append
-    try (HdfsFileWriter writer = new HdfsFileWriter(path, conf)) {
+    try (HdfsFileWriter writer = new HdfsFileWriter(fs, path, conf)) {
       assertTrue(fs.isFile(path));
       assertEquals(32, writer.nextOffset());
     }
@@ -67,7 +67,7 @@ public class HdfsFileWriterTest extends HdfsTestBase {
     // disable the append support
     conf.setBoolean("dfs.support.append", false);
     assertTrue(fs.isFile(path));
-    Throwable ise = assertThrows(IllegalStateException.class, () -> new HdfsFileWriter(path, conf));
+    Throwable ise = assertThrows(IllegalStateException.class, () -> new HdfsFileWriter(fs, path, conf));
     assertTrue(ise.getMessage().startsWith(path + " exists but append mode is not support!"));
   }
 
@@ -77,7 +77,7 @@ public class HdfsFileWriterTest extends HdfsTestBase {
     Path path = new Path(HDFS_URI, "createStreamDirectory");
     fs.mkdirs(path);
 
-    Throwable ise = assertThrows(IllegalStateException.class, () -> new HdfsFileWriter(path, conf));
+    Throwable ise = assertThrows(IllegalStateException.class, () -> new HdfsFileWriter(fs, path, conf));
     assertTrue(ise.getMessage().startsWith(HDFS_URI + "createStreamDirectory is a directory!"));
   }
 
@@ -89,7 +89,7 @@ public class HdfsFileWriterTest extends HdfsTestBase {
     buf.put(data);
     Path path = new Path(HDFS_URI, "createStreamTest");
 
-    try (HdfsFileWriter writer = new HdfsFileWriter(path, conf)) {
+    try (HdfsFileWriter writer = new HdfsFileWriter(fs, path, conf)) {
       assertEquals(0, writer.nextOffset());
       buf.flip();
       writer.writeData(buf.array());
@@ -103,7 +103,7 @@ public class HdfsFileWriterTest extends HdfsTestBase {
     new Random().nextBytes(data);
 
     Path path = new Path(HDFS_URI, "writeBufferTest");
-    try (HdfsFileWriter writer = new HdfsFileWriter(path, conf)) {
+    try (HdfsFileWriter writer = new HdfsFileWriter(fs, path, conf)) {
       assertEquals(0, writer.nextOffset());
       writer.writeData(data);
       assertEquals(32, writer.nextOffset());
@@ -128,7 +128,7 @@ public class HdfsFileWriterTest extends HdfsTestBase {
     buf.asIntBuffer().put(data);
 
     Path path = new Path(HDFS_URI, "writeBufferArrayTest");
-    try (HdfsFileWriter writer = new HdfsFileWriter(path, conf)) {
+    try (HdfsFileWriter writer = new HdfsFileWriter(fs, path, conf)) {
       assertEquals(0, writer.nextOffset());
       writer.writeData(buf.array());
       assertEquals(20, writer.nextOffset());
@@ -150,7 +150,7 @@ public class HdfsFileWriterTest extends HdfsTestBase {
         23, 128, 32, 32, 0xdeadbeef, 0);
 
     Path path = new Path(HDFS_URI, "writeSegmentTest");
-    try (HdfsFileWriter writer = new HdfsFileWriter(path, conf)) {
+    try (HdfsFileWriter writer = new HdfsFileWriter(fs, path, conf)) {
       writer.writeIndex(segment);
     }
 
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsHandlerTest.java
index d51d7ec1..9eff9821 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsHandlerTest.java
@@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class HdfsHandlerTest extends HdfsTestBase {
 
   @Test
-  public void initTest() throws IOException {
+  public void initTest() throws Exception {
     String basePath = HDFS_URI + "test_base";
     new HdfsShuffleWriteHandler("appId", 0, 0, 0, basePath, "test", conf);
     Path path = new Path(basePath);
@@ -50,7 +50,7 @@ public class HdfsHandlerTest extends HdfsTestBase {
   }
 
   @Test
-  public void writeTest() throws IOException, IllegalStateException {
+  public void writeTest() throws Exception {
     String basePath = HDFS_URI + "writeTest";
     HdfsShuffleWriteHandler writeHandler =
         new HdfsShuffleWriteHandler("appId", 1, 1, 1, basePath, "test", conf);
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java
index 73012c92..8c5dd7a7 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandlerTest.java
@@ -23,24 +23,26 @@ import java.util.Set;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.storage.HdfsShuffleHandlerTestBase;
+import org.apache.uniffle.storage.HdfsTestBase;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.fail;
 
-public class HdfsShuffleReadHandlerTest extends HdfsShuffleHandlerTestBase {
+public class HdfsShuffleReadHandlerTest extends HdfsTestBase {
 
-  @Test
-  public void test() {
+  public static void createAndRunCases(String clusterPathPrefix, Configuration conf, String user) {
     try {
-      String basePath = HDFS_URI + "HdfsShuffleFileReadHandlerTest";
+      String basePath = clusterPathPrefix + "HdfsShuffleFileReadHandlerTest";
       HdfsShuffleWriteHandler writeHandler =
           new HdfsShuffleWriteHandler(
               "appId",
@@ -49,7 +51,8 @@ public class HdfsShuffleReadHandlerTest extends HdfsShuffleHandlerTestBase {
               1,
               basePath,
               "test",
-              conf);
+              conf,
+              user);
 
       Map<Long, byte[]> expectedData = Maps.newHashMap();
 
@@ -57,8 +60,8 @@ public class HdfsShuffleReadHandlerTest extends HdfsShuffleHandlerTestBase {
       int totalBlockNum = 0;
       int expectTotalBlockNum = new Random().nextInt(37);
       int blockSize = new Random().nextInt(7) + 1;
-      writeTestData(writeHandler, expectTotalBlockNum, blockSize, 0, expectedData);
-      int total = calcExpectedSegmentNum(expectTotalBlockNum, blockSize, readBufferSize);
+      HdfsShuffleHandlerTestBase.writeTestData(writeHandler, expectTotalBlockNum, blockSize, 0, expectedData);
+      int total = HdfsShuffleHandlerTestBase.calcExpectedSegmentNum(expectTotalBlockNum, blockSize, readBufferSize);
       Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
       Roaring64NavigableMap processBlockIds =  Roaring64NavigableMap.bitmapOf();
       expectedData.forEach((id, block) -> expectBlockIds.addLong(id));
@@ -73,7 +76,7 @@ public class HdfsShuffleReadHandlerTest extends HdfsShuffleHandlerTestBase {
       for (int i = 0; i < total; ++i) {
         ShuffleDataResult shuffleDataResult = handler.readShuffleData();
         totalBlockNum += shuffleDataResult.getBufferSegments().size();
-        checkData(shuffleDataResult, expectedData);
+        HdfsShuffleHandlerTestBase.checkData(shuffleDataResult, expectedData);
         for (BufferSegment bufferSegment : shuffleDataResult.getBufferSegments()) {
           actualBlockIds.add(bufferSegment.getBlockId());
         }
@@ -91,4 +94,9 @@ public class HdfsShuffleReadHandlerTest extends HdfsShuffleHandlerTestBase {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void test() {
+    createAndRunCases(HDFS_URI, conf, StringUtils.EMPTY);
+  }
 }
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/KerberizedHdfsClientReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/KerberizedHdfsClientReadHandlerTest.java
new file mode 100644
index 00000000..682fd927
--- /dev/null
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/KerberizedHdfsClientReadHandlerTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.io.IOException;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.KerberizedHdfsBase;
+
+public class KerberizedHdfsClientReadHandlerTest extends KerberizedHdfsBase {
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    testRunner = KerberizedHdfsClientReadHandlerTest.class;
+    KerberizedHdfsBase.init();
+  }
+
+  @BeforeEach
+  public void beforeEach() throws Exception {
+    initHadoopSecurityContext();
+  }
+
+  @Test
+  public void test() throws IOException {
+    HdfsClientReadHandlerTest.createAndRunCases(
+        kerberizedHdfs.getSchemeAndAuthorityPrefix(),
+        kerberizedHdfs.getConf(),
+        "alex"
+    );
+  }
+}
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/KerberizedHdfsShuffleReadHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/KerberizedHdfsShuffleReadHandlerTest.java
new file mode 100644
index 00000000..f5b9ef74
--- /dev/null
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/KerberizedHdfsShuffleReadHandlerTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.io.IOException;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.KerberizedHdfsBase;
+
+public class KerberizedHdfsShuffleReadHandlerTest extends KerberizedHdfsBase {
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    testRunner = KerberizedHdfsShuffleReadHandlerTest.class;
+    KerberizedHdfsBase.init();
+  }
+
+  @BeforeEach
+  public void beforeEach() throws Exception {
+    initHadoopSecurityContext();
+  }
+
+  @Test
+  public void test() throws IOException {
+    HdfsShuffleReadHandlerTest.createAndRunCases(
+        kerberizedHdfs.getSchemeAndAuthorityPrefix(),
+        kerberizedHdfs.getConf(),
+        "alex"
+    );
+  }
+}
diff --git a/storage/src/test/java/org/apache/uniffle/storage/util/ShuffleHdfsStorageUtilsTest.java b/storage/src/test/java/org/apache/uniffle/storage/util/ShuffleHdfsStorageUtilsTest.java
index 6a449680..dff02cff 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/util/ShuffleHdfsStorageUtilsTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/util/ShuffleHdfsStorageUtilsTest.java
@@ -22,6 +22,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.util.Random;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -35,7 +37,15 @@ import static org.junit.jupiter.api.Assertions.fail;
 public class ShuffleHdfsStorageUtilsTest extends HdfsTestBase {
 
   @Test
-  public void testUploadFile(@TempDir File tempDir) {
+  public void testUploadFile(@TempDir File tempDir) throws Exception {
+    createAndRunCases(tempDir, fs, HDFS_URI, HdfsTestBase.conf);
+  }
+
+  public static void createAndRunCases(
+      File tempDir,
+      FileSystem fileSystem,
+      String clusterPathPrefix,
+      Configuration hadoopConf) throws Exception {
     FileOutputStream fileOut = null;
     DataOutputStream dataOut = null;
     try {
@@ -47,8 +57,8 @@ public class ShuffleHdfsStorageUtilsTest extends HdfsTestBase {
       dataOut.write(buf);
       dataOut.close();
       fileOut.close();
-      String path = HDFS_URI + "test";
-      HdfsFileWriter writer = new HdfsFileWriter(new Path(path), conf);
+      String path = clusterPathPrefix + "test";
+      HdfsFileWriter writer = new HdfsFileWriter(fileSystem, new Path(path), hadoopConf);
       long size = ShuffleStorageUtils.uploadFile(file, writer, 1024);
       assertEquals(2096, size);
       size = ShuffleStorageUtils.uploadFile(file, writer, 100);
diff --git a/storage/src/test/java/org/apache/uniffle/storage/util/ShuffleKerberizedHdfsStorageUtilsTest.java b/storage/src/test/java/org/apache/uniffle/storage/util/ShuffleKerberizedHdfsStorageUtilsTest.java
new file mode 100644
index 00000000..d6530ce8
--- /dev/null
+++ b/storage/src/test/java/org/apache/uniffle/storage/util/ShuffleKerberizedHdfsStorageUtilsTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.util;
+
+import java.io.File;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.uniffle.common.KerberizedHdfsBase;
+
+public class ShuffleKerberizedHdfsStorageUtilsTest extends KerberizedHdfsBase {
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    testRunner = ShuffleKerberizedHdfsStorageUtilsTest.class;
+    KerberizedHdfsBase.init();
+  }
+
+  @Test
+  public void testUploadFile(@TempDir File tempDir) throws Exception {
+    initHadoopSecurityContext();
+    ShuffleHdfsStorageUtilsTest.createAndRunCases(
+        tempDir,
+        kerberizedHdfs.getFileSystem(),
+        kerberizedHdfs.getSchemeAndAuthorityPrefix(),
+        kerberizedHdfs.getConf()
+    );
+  }
+}