You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/02/21 08:07:36 UTC

[incubator-uniffle] 01/02: [#627] fix(operator): support specifying custom ports (#629)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 3601f45ca13aa93d30286db66d23769c70c56a36
Author: jasonawang <ja...@tencent.com>
AuthorDate: Mon Feb 20 20:27:26 2023 +0800

    [#627] fix(operator): support specifying custom ports (#629)
    
    ### What changes were proposed in this pull request?
    Set coordinator/shuffler server's container port to the fields of RSS spec
    
    ### Why are the changes needed?
    Fix #627.
    
    ### Does this PR introduce _any_ user-facing change?
    For RSS cluster admin, they can set custom ports for shuffle servers and coordinators.
    
    ### How was this patch tested?
    Manually verified.
---
 .../operator/pkg/controller/constants/constants.go |   4 -
 .../pkg/controller/sync/coordinator/coordinator.go |  10 +-
 .../sync/coordinator/coordinator_test.go           | 184 ++++++++++++++++----
 .../controller/sync/shuffleserver/shuffleserver.go |  12 +-
 .../sync/shuffleserver/shuffleserver_test.go       | 188 ++++++++++++++++-----
 5 files changed, 305 insertions(+), 93 deletions(-)

diff --git a/deploy/kubernetes/operator/pkg/controller/constants/constants.go b/deploy/kubernetes/operator/pkg/controller/constants/constants.go
index 7c74d8bd..8bda1ca6 100644
--- a/deploy/kubernetes/operator/pkg/controller/constants/constants.go
+++ b/deploy/kubernetes/operator/pkg/controller/constants/constants.go
@@ -18,10 +18,6 @@
 package constants
 
 const (
-	// ContainerShuffleServerRPCPort indicates rpc port used in shuffle server containers.
-	ContainerShuffleServerRPCPort int32 = 19997
-	// ContainerShuffleServerHTTPPort indicates http port used in shuffle server containers.
-	ContainerShuffleServerHTTPPort int32 = 19996
 	// ContainerCoordinatorRPCPort indicates rpc port used in coordinator containers.
 	ContainerCoordinatorRPCPort int32 = 19997
 	// ContainerCoordinatorHTTPPort indicates http port used in coordinator containers.
diff --git a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
index a3ff5e58..0fccc65f 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator.go
@@ -281,7 +281,7 @@ func GenerateAddresses(rss *unifflev1alpha1.RemoteShuffleService) string {
 	for i := 0; i < int(*rss.Spec.Coordinator.Count); i++ {
 		name := GenerateNameByIndex(rss, i)
 		serviceName := appendHeadless(name)
-		current := fmt.Sprintf("%v:%v", serviceName, controllerconstants.ContainerShuffleServerRPCPort)
+		current := fmt.Sprintf("%v:%v", serviceName, *rss.Spec.Coordinator.RPCPort)
 		names = append(names, current)
 	}
 	return strings.Join(names, ",")
@@ -312,11 +312,11 @@ func generateMainContainer(rss *unifflev1alpha1.RemoteShuffleService) *corev1.Co
 func generateMainContainerPorts(rss *unifflev1alpha1.RemoteShuffleService) []corev1.ContainerPort {
 	ports := []corev1.ContainerPort{
 		{
-			ContainerPort: controllerconstants.ContainerCoordinatorRPCPort,
+			ContainerPort: *rss.Spec.Coordinator.RPCPort,
 			Protocol:      corev1.ProtocolTCP,
 		},
 		{
-			ContainerPort: controllerconstants.ContainerCoordinatorHTTPPort,
+			ContainerPort: *rss.Spec.Coordinator.HTTPPort,
 			Protocol:      corev1.ProtocolTCP,
 		},
 	}
@@ -329,11 +329,11 @@ func generateMainContainerENV(rss *unifflev1alpha1.RemoteShuffleService) []corev
 	env := []corev1.EnvVar{
 		{
 			Name:  controllerconstants.CoordinatorRPCPortEnv,
-			Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorRPCPort), 10),
+			Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.RPCPort), 10),
 		},
 		{
 			Name:  controllerconstants.CoordinatorHTTPPortEnv,
-			Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorHTTPPort), 10),
+			Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.HTTPPort), 10),
 		},
 		{
 			Name:  controllerconstants.XmxSizeEnv,
diff --git a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
index 6df2216b..361839b9 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/coordinator/coordinator_test.go
@@ -36,7 +36,9 @@ import (
 )
 
 const (
-	testRuntimeClassName = "test-runtime"
+	testRuntimeClassName       = "test-runtime"
+	testRPCPort          int32 = 19990
+	testHTTPPort         int32 = 19991
 )
 
 // IsValidDeploy checks generated deployment, returns whether it is valid and error message.
@@ -86,6 +88,57 @@ func buildRssWithCustomENVs() *uniffleapi.RemoteShuffleService {
 	return rss
 }
 
+func buildRssWithCustomRPCPort() *uniffleapi.RemoteShuffleService {
+	rss := utils.BuildRSSWithDefaultValue()
+	rss.Spec.Coordinator.RPCPort = pointer.Int32(testRPCPort)
+	return rss
+}
+
+func buildRssWithCustomHTTPPort() *uniffleapi.RemoteShuffleService {
+	rss := utils.BuildRSSWithDefaultValue()
+	rss.Spec.Coordinator.HTTPPort = pointer.Int32(testHTTPPort)
+	return rss
+}
+
+func buildCommonExpectedENVs(rss *uniffleapi.RemoteShuffleService) []corev1.EnvVar {
+	return []corev1.EnvVar{
+		{
+			Name:  controllerconstants.CoordinatorRPCPortEnv,
+			Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.RPCPort), 10),
+		},
+		{
+			Name:  controllerconstants.CoordinatorHTTPPortEnv,
+			Value: strconv.FormatInt(int64(*rss.Spec.Coordinator.HTTPPort), 10),
+		},
+		{
+			Name:  controllerconstants.XmxSizeEnv,
+			Value: rss.Spec.Coordinator.XmxSize,
+		},
+		{
+			Name:  controllerconstants.ServiceNameEnv,
+			Value: controllerconstants.CoordinatorServiceName,
+		},
+		{
+			Name: controllerconstants.NodeNameEnv,
+			ValueFrom: &corev1.EnvVarSource{
+				FieldRef: &corev1.ObjectFieldSelector{
+					APIVersion: "v1",
+					FieldPath:  "spec.nodeName",
+				},
+			},
+		},
+		{
+			Name: controllerconstants.RssIPEnv,
+			ValueFrom: &corev1.EnvVarSource{
+				FieldRef: &corev1.ObjectFieldSelector{
+					APIVersion: "v1",
+					FieldPath:  "status.podIP",
+				},
+			},
+		},
+	}
+}
+
 func TestGenerateDeploy(t *testing.T) {
 	for _, tt := range []struct {
 		name string
@@ -145,42 +198,7 @@ func TestGenerateDeploy(t *testing.T) {
 			rss:  buildRssWithCustomENVs(),
 			IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) (
 				valid bool, err error) {
-				expectENVs := []corev1.EnvVar{
-					{
-						Name:  controllerconstants.CoordinatorRPCPortEnv,
-						Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorRPCPort), 10),
-					},
-					{
-						Name:  controllerconstants.CoordinatorHTTPPortEnv,
-						Value: strconv.FormatInt(int64(controllerconstants.ContainerCoordinatorHTTPPort), 10),
-					},
-					{
-						Name:  controllerconstants.XmxSizeEnv,
-						Value: rss.Spec.Coordinator.XmxSize,
-					},
-					{
-						Name:  controllerconstants.ServiceNameEnv,
-						Value: controllerconstants.CoordinatorServiceName,
-					},
-					{
-						Name: controllerconstants.NodeNameEnv,
-						ValueFrom: &corev1.EnvVarSource{
-							FieldRef: &corev1.ObjectFieldSelector{
-								APIVersion: "v1",
-								FieldPath:  "spec.nodeName",
-							},
-						},
-					},
-					{
-						Name: controllerconstants.RssIPEnv,
-						ValueFrom: &corev1.EnvVarSource{
-							FieldRef: &corev1.ObjectFieldSelector{
-								APIVersion: "v1",
-								FieldPath:  "status.podIP",
-							},
-						},
-					},
-				}
+				expectENVs := buildCommonExpectedENVs(rss)
 				defaultEnvNames := sets.NewString()
 				for i := range expectENVs {
 					defaultEnvNames.Insert(expectENVs[i].Name)
@@ -202,6 +220,94 @@ func TestGenerateDeploy(t *testing.T) {
 				return
 			},
 		},
+		{
+			name: "set custom rpc port used by coordinator",
+			rss:  buildRssWithCustomRPCPort(),
+			IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) (
+				valid bool, err error) {
+				// check envs
+				expectENVs := buildCommonExpectedENVs(rss)
+				for i := range expectENVs {
+					if expectENVs[i].Name == controllerconstants.CoordinatorRPCPortEnv {
+						expectENVs[i].Value = strconv.FormatInt(int64(testRPCPort), 10)
+					}
+				}
+				actualENVs := deploy.Spec.Template.Spec.Containers[0].Env
+				valid = reflect.DeepEqual(expectENVs, actualENVs)
+				if !valid {
+					actualEnvBody, _ := json.Marshal(actualENVs)
+					expectEnvBody, _ := json.Marshal(expectENVs)
+					err = fmt.Errorf("unexpected ENVs:\n%v,\nexpected:\n%v",
+						string(actualEnvBody), string(expectEnvBody))
+					return
+				}
+
+				// check ports
+				expectPorts := []corev1.ContainerPort{
+					{
+						ContainerPort: testRPCPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+					{
+						ContainerPort: *rss.Spec.Coordinator.HTTPPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+				}
+				actualPorts := deploy.Spec.Template.Spec.Containers[0].Ports
+				valid = reflect.DeepEqual(expectPorts, actualPorts)
+				if !valid {
+					actualPortsBody, _ := json.Marshal(actualPorts)
+					expectPortsBody, _ := json.Marshal(expectPorts)
+					err = fmt.Errorf("unexpected Ports:\n%v,\nexpected:\n%v",
+						string(actualPortsBody), string(expectPortsBody))
+				}
+				return
+			},
+		},
+		{
+			name: "set custom http port used by coordinator",
+			rss:  buildRssWithCustomHTTPPort(),
+			IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) (
+				valid bool, err error) {
+				// check envs
+				expectENVs := buildCommonExpectedENVs(rss)
+				for i := range expectENVs {
+					if expectENVs[i].Name == controllerconstants.CoordinatorHTTPPortEnv {
+						expectENVs[i].Value = strconv.FormatInt(int64(testHTTPPort), 10)
+					}
+				}
+				actualENVs := deploy.Spec.Template.Spec.Containers[0].Env
+				valid = reflect.DeepEqual(expectENVs, actualENVs)
+				if !valid {
+					actualEnvBody, _ := json.Marshal(actualENVs)
+					expectEnvBody, _ := json.Marshal(expectENVs)
+					err = fmt.Errorf("unexpected ENVs:\n%v,\nexpected:\n%v",
+						string(actualEnvBody), string(expectEnvBody))
+					return
+				}
+
+				// check ports
+				expectPorts := []corev1.ContainerPort{
+					{
+						ContainerPort: *rss.Spec.Coordinator.RPCPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+					{
+						ContainerPort: testHTTPPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+				}
+				actualPorts := deploy.Spec.Template.Spec.Containers[0].Ports
+				valid = reflect.DeepEqual(expectPorts, actualPorts)
+				if !valid {
+					actualPortsBody, _ := json.Marshal(actualPorts)
+					expectPortsBody, _ := json.Marshal(expectPorts)
+					err = fmt.Errorf("unexpected Ports:\n%v,\nexpected:\n%v",
+						string(actualPortsBody), string(expectPortsBody))
+				}
+				return
+			},
+		},
 	} {
 		t.Run(tt.name, func(tc *testing.T) {
 			deploy := GenerateDeploy(tt.rss, 0)
@@ -269,4 +375,8 @@ func TestGenerateAddresses(t *testing.T) {
 	rss := buildRssWithLabels()
 	quorum := GenerateAddresses(rss)
 	assertion.Contains(quorum, "headless")
+
+	rss = buildRssWithCustomRPCPort()
+	quorum = GenerateAddresses(rss)
+	assertion.Contains(quorum, strconv.FormatInt(int64(testRPCPort), 10))
 }
diff --git a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
index 43a57be8..cb77d0af 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver.go
@@ -136,9 +136,9 @@ func GenerateSts(rss *unifflev1alpha1.RemoteShuffleService) *appsv1.StatefulSet
 						constants.AnnotationRssName: rss.Name,
 						constants.AnnotationRssUID:  string(rss.UID),
 						constants.AnnotationMetricsServerPort: fmt.Sprintf("%v",
-							controllerconstants.ContainerShuffleServerHTTPPort),
+							*rss.Spec.ShuffleServer.HTTPPort),
 						constants.AnnotationShuffleServerPort: fmt.Sprintf("%v",
-							controllerconstants.ContainerShuffleServerRPCPort),
+							*rss.Spec.ShuffleServer.RPCPort),
 					},
 				},
 				Spec: podSpec,
@@ -217,11 +217,11 @@ func generateMainContainer(rss *unifflev1alpha1.RemoteShuffleService) *corev1.Co
 func generateMainContainerPorts(rss *unifflev1alpha1.RemoteShuffleService) []corev1.ContainerPort {
 	ports := []corev1.ContainerPort{
 		{
-			ContainerPort: controllerconstants.ContainerShuffleServerRPCPort,
+			ContainerPort: *rss.Spec.ShuffleServer.RPCPort,
 			Protocol:      corev1.ProtocolTCP,
 		},
 		{
-			ContainerPort: controllerconstants.ContainerShuffleServerHTTPPort,
+			ContainerPort: *rss.Spec.ShuffleServer.HTTPPort,
 			Protocol:      corev1.ProtocolTCP,
 		},
 	}
@@ -234,11 +234,11 @@ func generateMainContainerENV(rss *unifflev1alpha1.RemoteShuffleService) []corev
 	env := []corev1.EnvVar{
 		{
 			Name:  controllerconstants.ShuffleServerRPCPortEnv,
-			Value: strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerRPCPort), 10),
+			Value: strconv.FormatInt(int64(*rss.Spec.ShuffleServer.RPCPort), 10),
 		},
 		{
 			Name:  controllerconstants.ShuffleServerHTTPPortEnv,
-			Value: strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerHTTPPort), 10),
+			Value: strconv.FormatInt(int64(*rss.Spec.ShuffleServer.HTTPPort), 10),
 		},
 		{
 			Name:  controllerconstants.RSSCoordinatorQuorumEnv,
diff --git a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go
index a1c6576c..80472b12 100644
--- a/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go
+++ b/deploy/kubernetes/operator/pkg/controller/sync/shuffleserver/shuffleserver_test.go
@@ -36,7 +36,9 @@ import (
 )
 
 const (
-	testRuntimeClassName = "test-runtime"
+	testRuntimeClassName       = "test-runtime"
+	testRPCPort          int32 = 19998
+	testHTTPPort         int32 = 19999
 )
 
 // IsValidSts checks generated statefulSet, returns whether it is valid and error message.
@@ -91,6 +93,61 @@ func buildRssWithCustomENVs() *uniffleapi.RemoteShuffleService {
 	return rss
 }
 
+func buildRssWithCustomRPCPort() *uniffleapi.RemoteShuffleService {
+	rss := utils.BuildRSSWithDefaultValue()
+	rss.Spec.ShuffleServer.RPCPort = pointer.Int32(testRPCPort)
+	return rss
+}
+
+func buildRssWithCustomHTTPPort() *uniffleapi.RemoteShuffleService {
+	rss := utils.BuildRSSWithDefaultValue()
+	rss.Spec.ShuffleServer.HTTPPort = pointer.Int32(testHTTPPort)
+	return rss
+}
+
+func buildCommonExpectedENVs(rss *uniffleapi.RemoteShuffleService) []corev1.EnvVar {
+	return []corev1.EnvVar{
+		{
+			Name:  controllerconstants.ShuffleServerRPCPortEnv,
+			Value: strconv.FormatInt(int64(*rss.Spec.ShuffleServer.RPCPort), 10),
+		},
+		{
+			Name:  controllerconstants.ShuffleServerHTTPPortEnv,
+			Value: strconv.FormatInt(int64(*rss.Spec.ShuffleServer.HTTPPort), 10),
+		},
+		{
+			Name:  controllerconstants.RSSCoordinatorQuorumEnv,
+			Value: coordinator.GenerateAddresses(rss),
+		},
+		{
+			Name:  controllerconstants.XmxSizeEnv,
+			Value: rss.Spec.ShuffleServer.XmxSize,
+		},
+		{
+			Name:  controllerconstants.ServiceNameEnv,
+			Value: controllerconstants.ShuffleServerServiceName,
+		},
+		{
+			Name: controllerconstants.NodeNameEnv,
+			ValueFrom: &corev1.EnvVarSource{
+				FieldRef: &corev1.ObjectFieldSelector{
+					APIVersion: "v1",
+					FieldPath:  "spec.nodeName",
+				},
+			},
+		},
+		{
+			Name: controllerconstants.RssIPEnv,
+			ValueFrom: &corev1.EnvVarSource{
+				FieldRef: &corev1.ObjectFieldSelector{
+					APIVersion: "v1",
+					FieldPath:  "status.podIP",
+				},
+			},
+		},
+	}
+}
+
 func TestGenerateSts(t *testing.T) {
 	for _, tt := range []struct {
 		name string
@@ -148,46 +205,7 @@ func TestGenerateSts(t *testing.T) {
 			name: "set custom environment variables",
 			rss:  buildRssWithCustomENVs(),
 			IsValidSts: func(sts *appsv1.StatefulSet, rss *uniffleapi.RemoteShuffleService) (valid bool, err error) {
-				expectENVs := []corev1.EnvVar{
-					{
-						Name:  controllerconstants.ShuffleServerRPCPortEnv,
-						Value: strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerRPCPort), 10),
-					},
-					{
-						Name:  controllerconstants.ShuffleServerHTTPPortEnv,
-						Value: strconv.FormatInt(int64(controllerconstants.ContainerShuffleServerHTTPPort), 10),
-					},
-					{
-						Name:  controllerconstants.RSSCoordinatorQuorumEnv,
-						Value: coordinator.GenerateAddresses(rss),
-					},
-					{
-						Name:  controllerconstants.XmxSizeEnv,
-						Value: rss.Spec.ShuffleServer.XmxSize,
-					},
-					{
-						Name:  controllerconstants.ServiceNameEnv,
-						Value: controllerconstants.ShuffleServerServiceName,
-					},
-					{
-						Name: controllerconstants.NodeNameEnv,
-						ValueFrom: &corev1.EnvVarSource{
-							FieldRef: &corev1.ObjectFieldSelector{
-								APIVersion: "v1",
-								FieldPath:  "spec.nodeName",
-							},
-						},
-					},
-					{
-						Name: controllerconstants.RssIPEnv,
-						ValueFrom: &corev1.EnvVarSource{
-							FieldRef: &corev1.ObjectFieldSelector{
-								APIVersion: "v1",
-								FieldPath:  "status.podIP",
-							},
-						},
-					},
-				}
+				expectENVs := buildCommonExpectedENVs(rss)
 				defaultEnvNames := sets.NewString()
 				for i := range expectENVs {
 					defaultEnvNames.Insert(expectENVs[i].Name)
@@ -209,6 +227,94 @@ func TestGenerateSts(t *testing.T) {
 				return
 			},
 		},
+		{
+			name: "set custom rpc port used by shuffle server",
+			rss:  buildRssWithCustomRPCPort(),
+			IsValidSts: func(sts *appsv1.StatefulSet, rss *uniffleapi.RemoteShuffleService) (
+				valid bool, err error) {
+				// check envs
+				expectENVs := buildCommonExpectedENVs(rss)
+				for i := range expectENVs {
+					if expectENVs[i].Name == controllerconstants.ShuffleServerRPCPortEnv {
+						expectENVs[i].Value = strconv.FormatInt(int64(testRPCPort), 10)
+					}
+				}
+				actualENVs := sts.Spec.Template.Spec.Containers[0].Env
+				valid = reflect.DeepEqual(expectENVs, actualENVs)
+				if !valid {
+					actualEnvBody, _ := json.Marshal(actualENVs)
+					expectEnvBody, _ := json.Marshal(expectENVs)
+					err = fmt.Errorf("unexpected ENVs:\n%v,\nexpected:\n%v",
+						string(actualEnvBody), string(expectEnvBody))
+					return
+				}
+
+				// check ports
+				expectPorts := []corev1.ContainerPort{
+					{
+						ContainerPort: testRPCPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+					{
+						ContainerPort: *rss.Spec.ShuffleServer.HTTPPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+				}
+				actualPorts := sts.Spec.Template.Spec.Containers[0].Ports
+				valid = reflect.DeepEqual(expectPorts, actualPorts)
+				if !valid {
+					actualPortsBody, _ := json.Marshal(actualPorts)
+					expectPortsBody, _ := json.Marshal(expectPorts)
+					err = fmt.Errorf("unexpected Ports:\n%v,\nexpected:\n%v",
+						string(actualPortsBody), string(expectPortsBody))
+				}
+				return
+			},
+		},
+		{
+			name: "set custom http port used by shuffle server",
+			rss:  buildRssWithCustomHTTPPort(),
+			IsValidSts: func(sts *appsv1.StatefulSet, rss *uniffleapi.RemoteShuffleService) (
+				valid bool, err error) {
+				// check envs
+				expectENVs := buildCommonExpectedENVs(rss)
+				for i := range expectENVs {
+					if expectENVs[i].Name == controllerconstants.ShuffleServerHTTPPortEnv {
+						expectENVs[i].Value = strconv.FormatInt(int64(testHTTPPort), 10)
+					}
+				}
+				actualENVs := sts.Spec.Template.Spec.Containers[0].Env
+				valid = reflect.DeepEqual(expectENVs, actualENVs)
+				if !valid {
+					actualEnvBody, _ := json.Marshal(actualENVs)
+					expectEnvBody, _ := json.Marshal(expectENVs)
+					err = fmt.Errorf("unexpected ENVs:\n%v,\nexpected:\n%v",
+						string(actualEnvBody), string(expectEnvBody))
+					return
+				}
+
+				// check ports
+				expectPorts := []corev1.ContainerPort{
+					{
+						ContainerPort: *rss.Spec.ShuffleServer.RPCPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+					{
+						ContainerPort: testHTTPPort,
+						Protocol:      corev1.ProtocolTCP,
+					},
+				}
+				actualPorts := sts.Spec.Template.Spec.Containers[0].Ports
+				valid = reflect.DeepEqual(expectPorts, actualPorts)
+				if !valid {
+					actualPortsBody, _ := json.Marshal(actualPorts)
+					expectPortsBody, _ := json.Marshal(expectPorts)
+					err = fmt.Errorf("unexpected Ports:\n%v,\nexpected:\n%v",
+						string(actualPortsBody), string(expectPortsBody))
+				}
+				return
+			},
+		},
 	} {
 		t.Run(tt.name, func(tc *testing.T) {
 			sts := GenerateSts(tt.rss)