You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/02/21 08:07:35 UTC

[incubator-uniffle] branch branch-0.7 updated (f7eb94ae -> 72f718c4)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a change to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


    from f7eb94ae [FOLLOWUP] fix: don't recreate base dir if it's already existed (#616) (#622)
     new 3601f45c [#627] fix(operator): support specifying custom ports (#629)
     new 72f718c4 [#632]fix: respect volumes in rss spec (#634)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../operator/pkg/controller/constants/constants.go |   4 -
 .../pkg/controller/sync/coordinator/coordinator.go |  32 +--
 .../sync/coordinator/coordinator_test.go           | 221 ++++++++++++++++----
 .../controller/sync/shuffleserver/shuffleserver.go |  34 ++--
 .../sync/shuffleserver/shuffleserver_test.go       | 225 +++++++++++++++++----
 5 files changed, 401 insertions(+), 115 deletions(-)


[incubator-uniffle] 02/02: [#632]fix: respect volumes in rss spec (#634)

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 72f718c40b2c84649da8e9b6cc70e5765ecd41aa
Author: advancedxy <xi...@apache.org>
AuthorDate: Tue Feb 21 16:01:59 2023 +0800

    [#632]fix: respect volumes in rss spec (#634)
    
    ### What changes were proposed in this pull request?
    respect volumes for rss coordinator and shuffle server
    
    ### Why are the changes needed?
    This is a bug fix
    Fix: #632
    
    ### Does this PR introduce _any_ user-facing change?
    Admin of rss cluster could specify the volumes which could be used to configure
    hadoop conf for example
    ### How was this patch tested?
    Added UTs.
---
 .../pkg/controller/sync/coordinator/coordinator.go | 22 ++++++-------
 .../sync/coordinator/coordinator_test.go           | 37 ++++++++++++++++++++++
 .../controller/sync/shuffleserver/shuffleserver.go | 22 ++++++-------
 .../sync/shuffleserver/shuffleserver_test.go       | 37 ++++++++++++++++++++++
 4 files changed, 96 insertions(+), 22 deletions(-)

diff --git a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
index 0fccc65f..09fbee3e 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
@@ -184,21 +184,21 @@ func GenerateDeploy(rss *unifflev1alpha1.RemoteShuffleService, index int) *appsv
 				Key:    "node-role.kubernetes.io/master",
 			},
 		},
-		Volumes: []corev1.Volume{
-			{
-				Name: controllerconstants.ConfigurationVolumeName,
-				VolumeSource: corev1.VolumeSource{
-					ConfigMap: &corev1.ConfigMapVolumeSource{
-						LocalObjectReference: corev1.LocalObjectReference{
-							Name: rss.Spec.ConfigMapName,
-						},
-						DefaultMode: pointer.Int32(0777),
-					},
+		Volumes:      rss.Spec.Coordinator.Volumes,
+		NodeSelector: rss.Spec.Coordinator.NodeSelector,
+	}
+	configurationVolume := corev1.Volume{
+		Name: controllerconstants.ConfigurationVolumeName,
+		VolumeSource: corev1.VolumeSource{
+			ConfigMap: &corev1.ConfigMapVolumeSource{
+				LocalObjectReference: corev1.LocalObjectReference{
+					Name: rss.Spec.ConfigMapName,
 				},
+				DefaultMode: pointer.Int32(0777),
 			},
 		},
-		NodeSelector: rss.Spec.Coordinator.NodeSelector,
 	}
+	podSpec.Volumes = append(podSpec.Volumes, configurationVolume)
 	if podSpec.HostNetwork {
 		podSpec.DNSPolicy = corev1.DNSClusterFirstWithHostNet
 	}
diff --git a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
index 361839b9..3da86e26 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
@@ -68,6 +68,19 @@ var (
 			Value: "127.0.0.1",
 		},
 	}
+	testVolumeName = "test-volume"
+	testVolumes    = []corev1.Volume{
+		{
+			Name: testVolumeName,
+			VolumeSource: corev1.VolumeSource{
+				ConfigMap: &corev1.ConfigMapVolumeSource{
+					LocalObjectReference: corev1.LocalObjectReference{
+						Name: "test-config",
+					},
+				},
+			},
+		},
+	}
 )
 
 func buildRssWithLabels() *uniffleapi.RemoteShuffleService {
@@ -88,6 +101,12 @@ func buildRssWithCustomENVs() *uniffleapi.RemoteShuffleService {
 	return rss
 }
 
+func withCustomVolumes(volumes []corev1.Volume) *uniffleapi.RemoteShuffleService {
+	rss := utils.BuildRSSWithDefaultValue()
+	rss.Spec.Coordinator.Volumes = volumes
+	return rss
+}
+
 func buildRssWithCustomRPCPort() *uniffleapi.RemoteShuffleService {
 	rss := utils.BuildRSSWithDefaultValue()
 	rss.Spec.Coordinator.RPCPort = pointer.Int32(testRPCPort)
@@ -308,6 +327,24 @@ func TestGenerateDeploy(t *testing.T) {
 				return
 			},
 		},
+		{
+			name: "set custom volumes",
+			rss:  withCustomVolumes(testVolumes),
+			IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) (bool, error) {
+				for _, volume := range deploy.Spec.Template.Spec.Volumes {
+					if volume.Name == testVolumeName {
+						expectedVolume := testVolumes[0]
+						equal := reflect.DeepEqual(expectedVolume, volume)
+						if equal {
+							return true, nil
+						}
+						volumeJSON, _ := json.Marshal(expectedVolume)
+						return false, fmt.Errorf("generated deploy doesn't contain expected volumn: %s", volumeJSON)
+					}
+				}
+				return false, fmt.Errorf("generated deploy should include volume: %s", testVolumeName)
+			},
+		},
 	} {
 		t.Run(tt.name, func(tc *testing.T) {
 			deploy := GenerateDeploy(tt.rss, 0)
diff --git a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
index cb77d0af..ea531d48 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
@@ -91,21 +91,21 @@ func GenerateSts(rss *unifflev1alpha1.RemoteShuffleService) *appsv1.StatefulSet
 				Key:    "node-role.kubernetes.io/master",
 			},
 		},
-		Volumes: []corev1.Volume{
-			{
-				Name: controllerconstants.ConfigurationVolumeName,
-				VolumeSource: corev1.VolumeSource{
-					ConfigMap: &corev1.ConfigMapVolumeSource{
-						LocalObjectReference: corev1.LocalObjectReference{
-							Name: rss.Spec.ConfigMapName,
-						},
-						DefaultMode: pointer.Int32(0777),
-					},
+		Volumes:      rss.Spec.ShuffleServer.Volumes,
+		NodeSelector: rss.Spec.ShuffleServer.NodeSelector,
+	}
+	configurationVolume := corev1.Volume{
+		Name: controllerconstants.ConfigurationVolumeName,
+		VolumeSource: corev1.VolumeSource{
+			ConfigMap: &corev1.ConfigMapVolumeSource{
+				LocalObjectReference: corev1.LocalObjectReference{
+					Name: rss.Spec.ConfigMapName,
 				},
+				DefaultMode: pointer.Int32(0777),
 			},
 		},
-		NodeSelector: rss.Spec.ShuffleServer.NodeSelector,
 	}
+	podSpec.Volumes = append(podSpec.Volumes, configurationVolume)
 	if podSpec.HostNetwork {
 		podSpec.DNSPolicy = corev1.DNSClusterFirstWithHostNet
 	}
diff --git a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go
index 80472b12..38c64b9e 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go
@@ -68,6 +68,19 @@ var (
 			Value: "1G",
 		},
 	}
+	testVolumeName = "test-volume"
+	testVolumes    = []corev1.Volume{
+		{
+			Name: testVolumeName,
+			VolumeSource: corev1.VolumeSource{
+				ConfigMap: &corev1.ConfigMapVolumeSource{
+					LocalObjectReference: corev1.LocalObjectReference{
+						Name: "test-config",
+					},
+				},
+			},
+		},
+	}
 )
 
 func buildRssWithLabels() *uniffleapi.RemoteShuffleService {
@@ -93,6 +106,12 @@ func buildRssWithCustomENVs() *uniffleapi.RemoteShuffleService {
 	return rss
 }
 
+func withCustomVolumes(volumes []corev1.Volume) *uniffleapi.RemoteShuffleService {
+	rss := utils.BuildRSSWithDefaultValue()
+	rss.Spec.ShuffleServer.Volumes = volumes
+	return rss
+}
+
 func buildRssWithCustomRPCPort() *uniffleapi.RemoteShuffleService {
 	rss := utils.BuildRSSWithDefaultValue()
 	rss.Spec.ShuffleServer.RPCPort = pointer.Int32(testRPCPort)
@@ -315,6 +334,24 @@ func TestGenerateSts(t *testing.T) {
 				return
 			},
 		},
+		{
+			name: "test custom volumes",
+			rss:  withCustomVolumes(testVolumes),
+			IsValidSts: func(sts *appsv1.StatefulSet, rss *uniffleapi.RemoteShuffleService) (valid bool, err error) {
+				for _, volume := range sts.Spec.Template.Spec.Volumes {
+					if volume.Name == testVolumeName {
+						expectedVolume := testVolumes[0]
+						equal := reflect.DeepEqual(expectedVolume, volume)
+						if equal {
+							return true, nil
+						}
+						volumeJSON, _ := json.Marshal(expectedVolume)
+						return false, fmt.Errorf("generated sts doesn't contain expected volumn: %s", volumeJSON)
+					}
+				}
+				return false, fmt.Errorf("generated sts should include volume: %s", testVolumeName)
+			},
+		},
 	} {
 		t.Run(tt.name, func(tc *testing.T) {
 			sts := GenerateSts(tt.rss)


[incubator-uniffle] 01/02: [#627] fix(operator): support specifying custom ports (#629)

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 3601f45ca13aa93d30286db66d23769c70c56a36
Author: jasonawang <ja...@tencent.com>
AuthorDate: Mon Feb 20 20:27:26 2023 +0800

    [#627] fix(operator): support specifying custom ports (#629)
    
    ### What changes were proposed in this pull request?
    Set coordinator/shuffler server's container port to the fields of RSS spec
    
    ### Why are the changes needed?
    Fix #627.
    
    ### Does this PR introduce _any_ user-facing change?
    For RSS cluster admin, they can set custom ports for shuffle servers and coordinators.
    
    ### How was this patch tested?
    Manually verified.
---
 .../operator/pkg/controller/constants/constants.go |   4 -
 .../pkg/controller/sync/coordinator/coordinator.go |  10 +-
 .../sync/coordinator/coordinator_test.go           | 184 ++++++++++++++++----
 .../controller/sync/shuffleserver/shuffleserver.go |  12 +-
 .../sync/shuffleserver/shuffleserver_test.go       | 188 ++++++++++++++++-----
 5 files changed, 305 insertions(+), 93 deletions(-)

diff --git a/deploy/kubernetes/operator/pkg/controller/constants/constants.go b/deploy/kubernetes/operator/pkg/controller/constants/constants.go
index 7c74d8bd..8bda1ca6 100644
--- a/deploy/kubernetes/operator/pkg/controller/constants/constants.go
+++ b/deploy/kubernetes/operator/pkg/controller/constants/constants.go
@@ -18,10 +18,6 @@
 package constants
 
 const (
-	// ContainerShuffleServerRPCPort indicates rpc port used in shuffle server containers.
-	ContainerShuffleServerRPCPort int32 = 19997
-	// ContainerShuffleServerHTTPPort indicates http port used in shuffle server containers.
-	ContainerShuffleServerHTTPPort int32 = 19996
 	// ContainerCoordinatorRPCPort indicates rpc port used in coordinator containers.
 	ContainerCoordinatorRPCPort int32 = 19997
 	// ContainerCoordinatorHTTPPort indicates http port used in coordinator containers.
diff --git a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
index a3ff5e58..0fccc65f 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
@@ -281,7 +281,7 @@ func GenerateAddresses(rss *unifflev1alpha1.RemoteShuffleService) string {
 	for i := 0; i < int(*rss.Spec.Coordinator.Count); i++ {
 		name := GenerateNameByIndex(rss, i)
 		serviceName := appendHeadless(name)
-		current := fmt.Sprintf("%v:%v", serviceName, controllerconstants.ContainerShuffleServerRPCPort)
+		current := fmt.Sprintf("%v:%v", serviceName, *rss.Spec.Coordinator.RPCPort)
 		names = append(names, current)
 	}
 	return strings.Join(names, ",")
@@ -312,11 +312,11 @@ func generateMainContainer(rss *unifflev1alpha1.RemoteShuffleService) *corev1.Co
 func generateMainContainerPorts(rss *unifflev1alpha1.RemoteShuffleService) []corev1.ContainerPort {
 	ports := []corev1.ContainerPort{
 		{
-			ContainerPort: controllerconstants.ContainerCoordinatorRPCPort,
+			ContainerPort: *rss.Spec.Coordinator.RPCPort,
 			Protocol:      corev1.ProtocolTCP,
 		},
 		{
-			ContainerPort: controllerconstants.ContainerCoordinatorHTTPPort,
+			ContainerPort: *rss.Spec.Coordinator.HTTPPort,
 			Protocol:      corev1.ProtocolTCP,
 		},
 	}
@@ -329,11 +329,11 @@ func generateMainContainerENV(rss *unifflev1alpha1.RemoteShuffleService) []corev
 	env := []corev1.EnvVar{
 		{
 			Name:  controllerconstants.CoordinatorRPCPortEnv,
-			Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorRPCPort), 10),
+			Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.RPCPort), 10),
 		},
 		{
 			Name:  controllerconstants.CoordinatorHTTPPortEnv,
-			Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorHTTPPort), 10),
+			Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.HTTPPort), 10),
 		},
 		{
 			Name:  controllerconstants.XmxSizeEnv,
diff --git a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
index 6df2216b..361839b9 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
@@ -36,7 +36,9 @@ import (
 )
 
 const (
-	testRuntimeClassName = "test-runtime"
+	testRuntimeClassName       = "test-runtime"
+	testRPCPort          int32 = 19990
+	testHTTPPort         int32 = 19991
 )
 
 // IsValidDeploy checks generated deployment, returns whether it is valid and error message.
@@ -86,6 +88,57 @@ func buildRssWithCustomENVs() *uniffleapi.RemoteShuffleService {
 	return rss
 }
 
+func buildRssWithCustomRPCPort() *uniffleapi.RemoteShuffleService {
+	rss := utils.BuildRSSWithDefaultValue()
+	rss.Spec.Coordinator.RPCPort = pointer.Int32(testRPCPort)
+	return rss
+}
+
+func buildRssWithCustomHTTPPort() *uniffleapi.RemoteShuffleService {
+	rss := utils.BuildRSSWithDefaultValue()
+	rss.Spec.Coordinator.HTTPPort = pointer.Int32(testHTTPPort)
+	return rss
+}
+
+func buildCommonExpectedENVs(rss *uniffleapi.RemoteShuffleService) []corev1.EnvVar {
+	return []corev1.EnvVar{
+		{
+			Name:  controllerconstants.CoordinatorRPCPortEnv,
+			Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.RPCPort), 10),
+		},
+		{
+			Name:  controllerconstants.CoordinatorHTTPPortEnv,
+			Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.HTTPPort), 10),
+		},
+		{
+			Name:  controllerconstants.XmxSizeEnv,
+			Value: rss.Spec.Coordinator.XmxSize,
+		},
+		{
+			Name:  controllerconstants.ServiceNameEnv,
+			Value: controllerconstants.CoordinatorServiceName,
+		},
+		{
+			Name: controllerconstants.NodeNameEnv,
+			ValueFrom: &corev1.EnvVarSource{
+				FieldRef: &corev1.ObjectFieldSelector{
+					APIVersion: "v1",
+					FieldPath:  "spec.nodeName",
+				},
+			},
+		},
+		{
+			Name: controllerconstants.RssIPEnv,
+			ValueFrom: &corev1.EnvVarSource{
+				FieldRef: &corev1.ObjectFieldSelector{
+					APIVersion: "v1",
+					FieldPath:  "status.podIP",
+				},
+			},
+		},
+	}
+}
+
 func TestGenerateDeploy(t *testing.T) {
 	for _, tt := range []struct {
 		name string
@@ -145,42 +198,7 @@ func TestGenerateDeploy(t *testing.T) {
 			rss:  buildRssWithCustomENVs(),
 			IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) (
 				valid bool, err error) {
-				expectENVs := []corev1.EnvVar{
-					{
-						Name:  controllerconstants.CoordinatorRPCPortEnv,
-						Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorRPCPort), 10),
-					},
-					{
-						Name:  controllerconstants.CoordinatorHTTPPortEnv,
-						Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorHTTPPort), 10),
-					},
-					{
-						Name:  controllerconstants.XmxSizeEnv,
-						Value: rss.Spec.Coordinator.XmxSize,
-					},
-					{
-						Name:  controllerconstants.ServiceNameEnv,
-						Value: controllerconstants.CoordinatorServiceName,
-					},
-					{
-						Name: controllerconstants.NodeNameEnv,
-						ValueFrom: &corev1.EnvVarSource{
-							FieldRef: &corev1.ObjectFieldSelector{
-								APIVersion: "v1",
-								FieldPath:  "spec.nodeName",
-							},
-						},
-					},
-					{
-						Name: controllerconstants.RssIPEnv,
-						ValueFrom: &corev1.EnvVarSource{
-							FieldRef: &corev1.ObjectFieldSelector{
-								APIVersion: "v1",
-								FieldPath:  "status.podIP",
-							},
-						},
-					},
-				}
+				expectENVs := buildCommonExpectedENVs(rss)
 				defaultEnvNames := sets.NewString()
 				for i := range expectENVs {
 					defaultEnvNames.Insert(expectENVs[i].Name)
@@ -202,6 +220,94 @@ func TestGenerateDeploy(t *testing.T) {
 				return
 			},
 		},
+		{
+			name: "set custom rpc port used by coordinator",
+			rss:  buildRssWithCustomRPCPort(),
+			IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) (
+				valid bool, err error) {
+				// check envs
+				expectENVs := buildCommonExpectedENVs(rss)
+				for i := range expectENVs {
+					if expectENVs[i].Name == controllerconstants.CoordinatorRPCPortEnv {
+						expectENVs[i].Value = strconv.FormatInt(int64(testRPCPort), 10)
+					}
+				}
+				actualENVs := deploy.Spec.Template.Spec.Containers[0].Env
+				valid = reflect.DeepEqual(expectENVs, actualENVs)
+				if !valid {
+					actualEnvBody, _ := json.Marshal(actualENVs)
+					expectEnvBody, _ := json.Marshal(expectENVs)
+					err = fmt.Errorf("unexpected ENVs:\n%v,\nexpected:\n%v",
+						string(actualEnvBody), string(expectEnvBody))
+					return
+				}
+
+				// check ports
+				expectPorts := []corev1.ContainerPort{
+					{
+						ContainerPort: testRPCPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+					{
+						ContainerPort: *rss.Spec.Coordinator.HTTPPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+				}
+				actualPorts := deploy.Spec.Template.Spec.Containers[0].Ports
+				valid = reflect.DeepEqual(expectPorts, actualPorts)
+				if !valid {
+					actualPortsBody, _ := json.Marshal(actualPorts)
+					expectPortsBody, _ := json.Marshal(expectPorts)
+					err = fmt.Errorf("unexpected Ports:\n%v,\nexpected:\n%v",
+						string(actualPortsBody), string(expectPortsBody))
+				}
+				return
+			},
+		},
+		{
+			name: "set custom http port used by coordinator",
+			rss:  buildRssWithCustomHTTPPort(),
+			IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) (
+				valid bool, err error) {
+				// check envs
+				expectENVs := buildCommonExpectedENVs(rss)
+				for i := range expectENVs {
+					if expectENVs[i].Name == controllerconstants.CoordinatorHTTPPortEnv {
+						expectENVs[i].Value = strconv.FormatInt(int64(testHTTPPort), 10)
+					}
+				}
+				actualENVs := deploy.Spec.Template.Spec.Containers[0].Env
+				valid = reflect.DeepEqual(expectENVs, actualENVs)
+				if !valid {
+					actualEnvBody, _ := json.Marshal(actualENVs)
+					expectEnvBody, _ := json.Marshal(expectENVs)
+					err = fmt.Errorf("unexpected ENVs:\n%v,\nexpected:\n%v",
+						string(actualEnvBody), string(expectEnvBody))
+					return
+				}
+
+				// check ports
+				expectPorts := []corev1.ContainerPort{
+					{
+						ContainerPort: *rss.Spec.Coordinator.RPCPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+					{
+						ContainerPort: testHTTPPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+				}
+				actualPorts := deploy.Spec.Template.Spec.Containers[0].Ports
+				valid = reflect.DeepEqual(expectPorts, actualPorts)
+				if !valid {
+					actualPortsBody, _ := json.Marshal(actualPorts)
+					expectPortsBody, _ := json.Marshal(expectPorts)
+					err = fmt.Errorf("unexpected Ports:\n%v,\nexpected:\n%v",
+						string(actualPortsBody), string(expectPortsBody))
+				}
+				return
+			},
+		},
 	} {
 		t.Run(tt.name, func(tc *testing.T) {
 			deploy := GenerateDeploy(tt.rss, 0)
@@ -269,4 +375,8 @@ func TestGenerateAddresses(t *testing.T) {
 	rss := buildRssWithLabels()
 	quorum := GenerateAddresses(rss)
 	assertion.Contains(quorum, "headless")
+
+	rss = buildRssWithCustomRPCPort()
+	quorum = GenerateAddresses(rss)
+	assertion.Contains(quorum, strconv.FormatInt(int64(testRPCPort), 10))
 }
diff --git a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
index 43a57be8..cb77d0af 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
@@ -136,9 +136,9 @@ func GenerateSts(rss *unifflev1alpha1.RemoteShuffleService) *appsv1.StatefulSet
 						constants.AnnotationRssName: rss.Name,
 						constants.AnnotationRssUID:  string(rss.UID),
 						constants.AnnotationMetricsServerPort: fmt.Sprintf("%v",
-							controllerconstants.ContainerShuffleServerHTTPPort),
+							*rss.Spec.ShuffleServer.HTTPPort),
 						constants.AnnotationShuffleServerPort: fmt.Sprintf("%v",
-							controllerconstants.ContainerShuffleServerRPCPort),
+							*rss.Spec.ShuffleServer.RPCPort),
 					},
 				},
 				Spec: podSpec,
@@ -217,11 +217,11 @@ func generateMainContainer(rss *unifflev1alpha1.RemoteShuffleService) *corev1.Co
 func generateMainContainerPorts(rss *unifflev1alpha1.RemoteShuffleService) []corev1.ContainerPort {
 	ports := []corev1.ContainerPort{
 		{
-			ContainerPort: controllerconstants.ContainerShuffleServerRPCPort,
+			ContainerPort: *rss.Spec.ShuffleServer.RPCPort,
 			Protocol:      corev1.ProtocolTCP,
 		},
 		{
-			ContainerPort: controllerconstants.ContainerShuffleServerHTTPPort,
+			ContainerPort: *rss.Spec.ShuffleServer.HTTPPort,
 			Protocol:      corev1.ProtocolTCP,
 		},
 	}
@@ -234,11 +234,11 @@ func generateMainContainerENV(rss *unifflev1alpha1.RemoteShuffleService) []corev
 	env := []corev1.EnvVar{
 		{
 			Name:  controllerconstants.ShuffleServerRPCPortEnv,
-			Value: strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerRPCPort), 10),
+			Value: strconv.FormatInt(int64(*rss.Spec.ShuffleServer.RPCPort), 10),
 		},
 		{
 			Name:  controllerconstants.ShuffleServerHTTPPortEnv,
-			Value: strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerHTTPPort), 10),
+			Value: strconv.FormatInt(int64(*rss.Spec.ShuffleServer.HTTPPort), 10),
 		},
 		{
 			Name:  controllerconstants.RSSCoordinatorQuorumEnv,
diff --git a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go
index a1c6576c..80472b12 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go
@@ -36,7 +36,9 @@ import (
 )
 
 const (
-	testRuntimeClassName = "test-runtime"
+	testRuntimeClassName       = "test-runtime"
+	testRPCPort          int32 = 19998
+	testHTTPPort         int32 = 19999
 )
 
 // IsValidSts checks generated statefulSet, returns whether it is valid and error message.
@@ -91,6 +93,61 @@ func buildRssWithCustomENVs() *uniffleapi.RemoteShuffleService {
 	return rss
 }
 
+func buildRssWithCustomRPCPort() *uniffleapi.RemoteShuffleService {
+	rss := utils.BuildRSSWithDefaultValue()
+	rss.Spec.ShuffleServer.RPCPort = pointer.Int32(testRPCPort)
+	return rss
+}
+
+func buildRssWithCustomHTTPPort() *uniffleapi.RemoteShuffleService {
+	rss := utils.BuildRSSWithDefaultValue()
+	rss.Spec.ShuffleServer.HTTPPort = pointer.Int32(testHTTPPort)
+	return rss
+}
+
+func buildCommonExpectedENVs(rss *uniffleapi.RemoteShuffleService) []corev1.EnvVar {
+	return []corev1.EnvVar{
+		{
+			Name:  controllerconstants.ShuffleServerRPCPortEnv,
+			Value: strconv.FormatInt(int64(*rss.Spec.ShuffleServer.RPCPort), 10),
+		},
+		{
+			Name:  controllerconstants.ShuffleServerHTTPPortEnv,
+			Value: strconv.FormatInt(int64(*rss.Spec.ShuffleServer.HTTPPort), 10),
+		},
+		{
+			Name:  controllerconstants.RSSCoordinatorQuorumEnv,
+			Value: coordinator.GenerateAddresses(rss),
+		},
+		{
+			Name:  controllerconstants.XmxSizeEnv,
+			Value: rss.Spec.ShuffleServer.XmxSize,
+		},
+		{
+			Name:  controllerconstants.ServiceNameEnv,
+			Value: controllerconstants.ShuffleServerServiceName,
+		},
+		{
+			Name: controllerconstants.NodeNameEnv,
+			ValueFrom: &corev1.EnvVarSource{
+				FieldRef: &corev1.ObjectFieldSelector{
+					APIVersion: "v1",
+					FieldPath:  "spec.nodeName",
+				},
+			},
+		},
+		{
+			Name: controllerconstants.RssIPEnv,
+			ValueFrom: &corev1.EnvVarSource{
+				FieldRef: &corev1.ObjectFieldSelector{
+					APIVersion: "v1",
+					FieldPath:  "status.podIP",
+				},
+			},
+		},
+	}
+}
+
 func TestGenerateSts(t *testing.T) {
 	for _, tt := range []struct {
 		name string
@@ -148,46 +205,7 @@ func TestGenerateSts(t *testing.T) {
 			name: "set custom environment variables",
 			rss:  buildRssWithCustomENVs(),
 			IsValidSts: func(sts *appsv1.StatefulSet, rss *uniffleapi.RemoteShuffleService) (valid bool, err error) {
-				expectENVs := []corev1.EnvVar{
-					{
-						Name:  controllerconstants.ShuffleServerRPCPortEnv,
-						Value: strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerRPCPort), 10),
-					},
-					{
-						Name:  controllerconstants.ShuffleServerHTTPPortEnv,
-						Value: strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerHTTPPort), 10),
-					},
-					{
-						Name:  controllerconstants.RSSCoordinatorQuorumEnv,
-						Value: coordinator.GenerateAddresses(rss),
-					},
-					{
-						Name:  controllerconstants.XmxSizeEnv,
-						Value: rss.Spec.ShuffleServer.XmxSize,
-					},
-					{
-						Name:  controllerconstants.ServiceNameEnv,
-						Value: controllerconstants.ShuffleServerServiceName,
-					},
-					{
-						Name: controllerconstants.NodeNameEnv,
-						ValueFrom: &corev1.EnvVarSource{
-							FieldRef: &corev1.ObjectFieldSelector{
-								APIVersion: "v1",
-								FieldPath:  "spec.nodeName",
-							},
-						},
-					},
-					{
-						Name: controllerconstants.RssIPEnv,
-						ValueFrom: &corev1.EnvVarSource{
-							FieldRef: &corev1.ObjectFieldSelector{
-								APIVersion: "v1",
-								FieldPath:  "status.podIP",
-							},
-						},
-					},
-				}
+				expectENVs := buildCommonExpectedENVs(rss)
 				defaultEnvNames := sets.NewString()
 				for i := range expectENVs {
 					defaultEnvNames.Insert(expectENVs[i].Name)
@@ -209,6 +227,94 @@ func TestGenerateSts(t *testing.T) {
 				return
 			},
 		},
+		{
+			name: "set custom rpc port used by shuffle server",
+			rss:  buildRssWithCustomRPCPort(),
+			IsValidSts: func(sts *appsv1.StatefulSet, rss *uniffleapi.RemoteShuffleService) (
+				valid bool, err error) {
+				// check envs
+				expectENVs := buildCommonExpectedENVs(rss)
+				for i := range expectENVs {
+					if expectENVs[i].Name == controllerconstants.ShuffleServerRPCPortEnv {
+						expectENVs[i].Value = strconv.FormatInt(int64(testRPCPort), 10)
+					}
+				}
+				actualENVs := sts.Spec.Template.Spec.Containers[0].Env
+				valid = reflect.DeepEqual(expectENVs, actualENVs)
+				if !valid {
+					actualEnvBody, _ := json.Marshal(actualENVs)
+					expectEnvBody, _ := json.Marshal(expectENVs)
+					err = fmt.Errorf("unexpected ENVs:\n%v,\nexpected:\n%v",
+						string(actualEnvBody), string(expectEnvBody))
+					return
+				}
+
+				// check ports
+				expectPorts := []corev1.ContainerPort{
+					{
+						ContainerPort: testRPCPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+					{
+						ContainerPort: *rss.Spec.ShuffleServer.HTTPPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+				}
+				actualPorts := sts.Spec.Template.Spec.Containers[0].Ports
+				valid = reflect.DeepEqual(expectPorts, actualPorts)
+				if !valid {
+					actualPortsBody, _ := json.Marshal(actualPorts)
+					expectPortsBody, _ := json.Marshal(expectPorts)
+					err = fmt.Errorf("unexpected Ports:\n%v,\nexpected:\n%v",
+						string(actualPortsBody), string(expectPortsBody))
+				}
+				return
+			},
+		},
+		{
+			name: "set custom http port used by shuffle server",
+			rss:  buildRssWithCustomHTTPPort(),
+			IsValidSts: func(sts *appsv1.StatefulSet, rss *uniffleapi.RemoteShuffleService) (
+				valid bool, err error) {
+				// check envs
+				expectENVs := buildCommonExpectedENVs(rss)
+				for i := range expectENVs {
+					if expectENVs[i].Name == controllerconstants.ShuffleServerHTTPPortEnv {
+						expectENVs[i].Value = strconv.FormatInt(int64(testHTTPPort), 10)
+					}
+				}
+				actualENVs := sts.Spec.Template.Spec.Containers[0].Env
+				valid = reflect.DeepEqual(expectENVs, actualENVs)
+				if !valid {
+					actualEnvBody, _ := json.Marshal(actualENVs)
+					expectEnvBody, _ := json.Marshal(expectENVs)
+					err = fmt.Errorf("unexpected ENVs:\n%v,\nexpected:\n%v",
+						string(actualEnvBody), string(expectEnvBody))
+					return
+				}
+
+				// check ports
+				expectPorts := []corev1.ContainerPort{
+					{
+						ContainerPort: *rss.Spec.ShuffleServer.RPCPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+					{
+						ContainerPort: testHTTPPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+				}
+				actualPorts := sts.Spec.Template.Spec.Containers[0].Ports
+				valid = reflect.DeepEqual(expectPorts, actualPorts)
+				if !valid {
+					actualPortsBody, _ := json.Marshal(actualPorts)
+					expectPortsBody, _ := json.Marshal(expectPorts)
+					err = fmt.Errorf("unexpected Ports:\n%v,\nexpected:\n%v",
+						string(actualPortsBody), string(expectPortsBody))
+				}
+				return
+			},
+		},
 	} {
 		t.Run(tt.name, func(tc *testing.T) {
 			sts := GenerateSts(tt.rss)