You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2021/10/25 18:24:02 UTC
[beam] branch master updated: Add environment_service.go and
structures for beam sdk, network envs, cache and application envs
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new aa26b53 Add environment_service.go and structures for beam sdk, network envs, cache and application envs
new 835d19a Merge pull request #15654 from [BEAM-12998] [Playground] Environment service
aa26b53 is described below
commit aa26b53f633847947c535b48b574cd652a1a0c3f
Author: Ilya Kozyrev <il...@akvelon.com>
AuthorDate: Mon Oct 25 18:33:34 2021 +0300
Add environment_service.go and structures for beam sdk, network envs, cache and application envs
Co-authored-by: AydarZaynutdinov <ay...@akvelon.com>
Co-authored-by: daria.malkova <da...@akvelon.com>
---
playground/backend/cmd/server/http.go | 2 +-
playground/backend/cmd/server/server.go | 23 +-
playground/backend/configs/SDK_JAVA.json | 13 +
.../backend/internal/environment/application.go | 98 +++++++
.../internal/environment/application_test.go | 261 +++++++++++++++++++
playground/backend/internal/environment/beam.go | 43 ++++
.../backend/internal/environment/environment.go | 28 --
.../internal/environment/environment_service.go | 193 ++++++++++++++
.../environment/environment_service_test.go | 283 +++++++++++++++++++++
9 files changed, 913 insertions(+), 31 deletions(-)
diff --git a/playground/backend/cmd/server/http.go b/playground/backend/cmd/server/http.go
index 8dce878..d49cbfe 100644
--- a/playground/backend/cmd/server/http.go
+++ b/playground/backend/cmd/server/http.go
@@ -22,7 +22,7 @@ import (
)
// listenHttp binds the http.Handler on the TCP network address
-func listenHttp(ctx context.Context, errChan chan error, envs environment.ServerEnvs, handler http.Handler) {
+func listenHttp(ctx context.Context, errChan chan error, envs environment.NetworkEnvs, handler http.Handler) {
grpclog.Infof("listening HTTP at %s\n", envs.Address())
if err := http.ListenAndServe(envs.Address(), handler); err != nil {
errChan <- err
diff --git a/playground/backend/cmd/server/server.go b/playground/backend/cmd/server/server.go
index 3102188..ea9edfc 100644
--- a/playground/backend/cmd/server/server.go
+++ b/playground/backend/cmd/server/server.go
@@ -32,7 +32,10 @@ func runServer() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- envService := environment.NewEnvironment()
+ envService, err := setupEnvironment()
+ if err != nil {
+ return err
+ }
grpcServer := grpc.NewServer()
pb.RegisterPlaygroundServiceServer(grpcServer, &playgroundController{})
@@ -40,7 +43,7 @@ func runServer() error {
handler := Wrap(grpcServer, getGrpcWebOptions())
errChan := make(chan error)
- go listenHttp(ctx, errChan, envService, handler)
+ go listenHttp(ctx, errChan, envService.NetworkEnvs, handler)
for {
select {
@@ -53,6 +56,22 @@ func runServer() error {
}
}
+func setupEnvironment() (*environment.Environment, error) {
+ networkEnvs, err := environment.GetNetworkEnvsFromOsEnvs()
+ if err != nil {
+ return nil, err
+ }
+ beamEnvs, err := environment.GetSdkEnvsFromOsEnvs()
+ if err != nil {
+ return nil, err
+ }
+ appEnvs, err := environment.GetApplicationEnvsFromOsEnvs()
+ if err != nil {
+ return nil, err
+ }
+ return environment.NewEnvironment(*networkEnvs, *beamEnvs, *appEnvs), nil
+}
+
// getGrpcWebOptions returns grpcweb options needed to configure wrapper
func getGrpcWebOptions() []grpcweb.Option {
return []grpcweb.Option{
diff --git a/playground/backend/configs/SDK_JAVA.json b/playground/backend/configs/SDK_JAVA.json
new file mode 100644
index 0000000..f371b12
--- /dev/null
+++ b/playground/backend/configs/SDK_JAVA.json
@@ -0,0 +1,13 @@
+{
+ "compile_cmd": "javac",
+ "run_cmd": "java",
+ "compile_args": [
+ "-d",
+ "bin",
+ "-classpath"
+ ],
+ "run_args": [
+ "-cp",
+ "bin:"
+ ]
+}
\ No newline at end of file
diff --git a/playground/backend/internal/environment/application.go b/playground/backend/internal/environment/application.go
new file mode 100644
index 0000000..39c7753
--- /dev/null
+++ b/playground/backend/internal/environment/application.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+ "fmt"
+ "time"
+)
+
+// NetworkEnvs contains all environment variables that need to run server.
+type NetworkEnvs struct {
+ ip string
+ port int
+}
+
+// NewNetworkEnvs constructor for NetworkEnvs
+func NewNetworkEnvs(ip string, port int) *NetworkEnvs {
+ return &NetworkEnvs{ip: ip, port: port}
+}
+
+// Address returns concatenated ip and port through ':'
+func (serverEnvs NetworkEnvs) Address() string {
+ return fmt.Sprintf("%s:%d", serverEnvs.ip, serverEnvs.port)
+}
+
+//CacheEnvs contains all environment variables that needed to use cache
+type CacheEnvs struct {
+ cacheType string
+ address string
+ keyExpirationTime time.Duration
+}
+
+// CacheType returns cache type
+func (ce *CacheEnvs) CacheType() string {
+ return ce.cacheType
+}
+
+// Address returns address to connect to remote cache service
+func (ce *CacheEnvs) Address() string {
+ return ce.address
+}
+
+// KeyExpirationTime returns cacheExpirationTime
+func (ce *CacheEnvs) KeyExpirationTime() time.Duration {
+ return ce.keyExpirationTime
+}
+
+// NewCacheEnvs constructor for CacheEnvs
+func NewCacheEnvs(cacheType, cacheAddress string, cacheExpirationTime time.Duration) *CacheEnvs {
+ return &CacheEnvs{
+ cacheType: cacheType,
+ address: cacheAddress,
+ keyExpirationTime: cacheExpirationTime,
+ }
+}
+
+//ApplicationEnvs contains all environment variables that needed to run backend processes
+type ApplicationEnvs struct {
+ workingDir string
+ cacheEnvs *CacheEnvs
+ pipelineExecuteTimeout time.Duration
+}
+
+// NewApplicationEnvs constructor for ApplicationEnvs
+func NewApplicationEnvs(workingDir string, cacheEnvs *CacheEnvs, pipelineExecuteTimeout time.Duration) *ApplicationEnvs {
+ return &ApplicationEnvs{
+ workingDir: workingDir,
+ cacheEnvs: cacheEnvs,
+ pipelineExecuteTimeout: pipelineExecuteTimeout,
+ }
+}
+
+// WorkingDir returns workingDir
+func (ae *ApplicationEnvs) WorkingDir() string {
+ return ae.workingDir
+}
+
+func (ae *ApplicationEnvs) CacheEnvs() CacheEnvs {
+ return *ae.cacheEnvs
+}
+
+// PipelineExecuteTimeout returns pipelineExecuteTimeout
+func (ae *ApplicationEnvs) PipelineExecuteTimeout() time.Duration {
+ return ae.pipelineExecuteTimeout
+}
diff --git a/playground/backend/internal/environment/application_test.go b/playground/backend/internal/environment/application_test.go
new file mode 100644
index 0000000..78bca3f
--- /dev/null
+++ b/playground/backend/internal/environment/application_test.go
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+ "fmt"
+ "testing"
+ "time"
+)
+
+func TestNetworkEnvs_Address(t *testing.T) {
+ type fields struct {
+ ip string
+ port int
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "ip and port concatenated through ':'",
+ fields: fields{ip: defaultIp, port: defaultPort},
+ want: fmt.Sprintf("%s:%d", defaultIp, defaultPort),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ serverEnvs := NetworkEnvs{
+ ip: tt.fields.ip,
+ port: tt.fields.port,
+ }
+ if got := serverEnvs.Address(); got != tt.want {
+ t.Errorf("Address() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestCacheEnvs_CacheType(t *testing.T) {
+ type fields struct {
+ cacheType string
+ address string
+ keyExpirationTime time.Duration
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "all success",
+ fields: fields{
+ cacheType: "MOCK_CACHE_TYPE",
+ address: "MOCK_ADDRESS",
+ keyExpirationTime: 0,
+ },
+ want: "MOCK_CACHE_TYPE",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ce := &CacheEnvs{
+ cacheType: tt.fields.cacheType,
+ address: tt.fields.address,
+ keyExpirationTime: tt.fields.keyExpirationTime,
+ }
+ if got := ce.CacheType(); got != tt.want {
+ t.Errorf("CacheType() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestCacheEnvs_Address(t *testing.T) {
+ type fields struct {
+ cacheType string
+ address string
+ keyExpirationTime time.Duration
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "all success",
+ fields: fields{
+ cacheType: "MOCK_CACHE_TYPE",
+ address: "MOCK_ADDRESS",
+ keyExpirationTime: 0,
+ },
+ want: "MOCK_ADDRESS",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ce := &CacheEnvs{
+ cacheType: tt.fields.cacheType,
+ address: tt.fields.address,
+ keyExpirationTime: tt.fields.keyExpirationTime,
+ }
+ if got := ce.Address(); got != tt.want {
+ t.Errorf("Address() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestCacheEnvs_KeyExpirationTime(t *testing.T) {
+ type fields struct {
+ cacheType string
+ address string
+ keyExpirationTime time.Duration
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want time.Duration
+ }{
+ {
+ name: "all success",
+ fields: fields{
+ cacheType: "MOCK_CACHE_TYPE",
+ address: "MOCK_ADDRESS",
+ keyExpirationTime: 0,
+ },
+ want: 0,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ce := &CacheEnvs{
+ cacheType: tt.fields.cacheType,
+ address: tt.fields.address,
+ keyExpirationTime: tt.fields.keyExpirationTime,
+ }
+ if got := ce.KeyExpirationTime(); got != tt.want {
+ t.Errorf("KeyExpirationTime() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestApplicationEnvs_WorkingDir(t *testing.T) {
+ type fields struct {
+ workingDir string
+ cacheEnvs *CacheEnvs
+ pipelineExecuteTimeout time.Duration
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "all success",
+ fields: fields{
+ workingDir: "MOCK_WORKING_DIR",
+ cacheEnvs: &CacheEnvs{},
+ pipelineExecuteTimeout: 0,
+ },
+ want: "MOCK_WORKING_DIR",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ae := &ApplicationEnvs{
+ workingDir: tt.fields.workingDir,
+ cacheEnvs: tt.fields.cacheEnvs,
+ pipelineExecuteTimeout: tt.fields.pipelineExecuteTimeout,
+ }
+ if got := ae.WorkingDir(); got != tt.want {
+ t.Errorf("WorkingDir() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestApplicationEnvs_CacheEnvs(t *testing.T) {
+ type fields struct {
+ workingDir string
+ cacheEnvs *CacheEnvs
+ pipelineExecuteTimeout time.Duration
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want CacheEnvs
+ }{
+ {
+ name: "all success",
+ fields: fields{
+ workingDir: "MOCK_WORKING_DIR",
+ cacheEnvs: &CacheEnvs{},
+ pipelineExecuteTimeout: 0,
+ },
+ want: CacheEnvs{},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ae := &ApplicationEnvs{
+ workingDir: tt.fields.workingDir,
+ cacheEnvs: tt.fields.cacheEnvs,
+ pipelineExecuteTimeout: tt.fields.pipelineExecuteTimeout,
+ }
+ if got := ae.CacheEnvs(); got != tt.want {
+ t.Errorf("CacheEnvs() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestApplicationEnvs_PipelineExecuteTimeout(t *testing.T) {
+ type fields struct {
+ workingDir string
+ cacheEnvs *CacheEnvs
+ pipelineExecuteTimeout time.Duration
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want time.Duration
+ }{
+ {
+ name: "all success",
+ fields: fields{
+ workingDir: "MOCK_WORKING_DIR",
+ cacheEnvs: &CacheEnvs{},
+ pipelineExecuteTimeout: 0,
+ },
+ want: 0,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ae := &ApplicationEnvs{
+ workingDir: tt.fields.workingDir,
+ cacheEnvs: tt.fields.cacheEnvs,
+ pipelineExecuteTimeout: tt.fields.pipelineExecuteTimeout,
+ }
+ if got := ae.PipelineExecuteTimeout(); got != tt.want {
+ t.Errorf("PipelineExecuteTimeout() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/playground/backend/internal/environment/beam.go b/playground/backend/internal/environment/beam.go
new file mode 100644
index 0000000..d266d0c
--- /dev/null
+++ b/playground/backend/internal/environment/beam.go
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+ pb "beam.apache.org/playground/backend/internal/api/v1"
+)
+
+// ExecutorConfig contains needed for compiling and execution of the code commands
+type ExecutorConfig struct {
+ CompileCmd string `json:"compile_cmd"`
+ RunCmd string `json:"run_cmd"`
+ CompileArgs []string `json:"compile_args"`
+ RunArgs []string `json:"run_args"`
+}
+
+func NewExecutorConfig(compileCmd string, runCmd string, compileArgs []string, runArgs []string) *ExecutorConfig {
+ return &ExecutorConfig{CompileCmd: compileCmd, RunCmd: runCmd, CompileArgs: compileArgs, RunArgs: runArgs}
+}
+
+// BeamEnvs contains all environments related of ApacheBeam. These will use to run pipelines
+type BeamEnvs struct {
+ ApacheBeamSdk pb.Sdk
+ ExecutorConfig *ExecutorConfig
+}
+
+// NewBeamEnvs is a BeamEnvs constructor
+func NewBeamEnvs(apacheBeamSdk pb.Sdk, executorConfig *ExecutorConfig) *BeamEnvs {
+ return &BeamEnvs{ApacheBeamSdk: apacheBeamSdk, ExecutorConfig: executorConfig}
+}
diff --git a/playground/backend/internal/environment/environment.go b/playground/backend/internal/environment/environment.go
deleted file mode 100644
index 910dec9..0000000
--- a/playground/backend/internal/environment/environment.go
+++ /dev/null
@@ -1,28 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// TODO: remove this code when merging https://github.com/apache/beam/pull/15654
-
-package environment
-
-type ServerEnvs struct {}
-
-func (envs ServerEnvs) Address() string {
- return ""
-}
-
-func NewEnvironment() ServerEnvs {
- return ServerEnvs{}
-}
diff --git a/playground/backend/internal/environment/environment_service.go b/playground/backend/internal/environment/environment_service.go
new file mode 100644
index 0000000..240a972
--- /dev/null
+++ b/playground/backend/internal/environment/environment_service.go
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+ pb "beam.apache.org/playground/backend/internal/api/v1"
+ "encoding/json"
+ "errors"
+ "io/ioutil"
+ "log"
+ "os"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "time"
+)
+
+const (
+ serverIpKey = "SERVER_IP"
+ serverPortKey = "SERVER_PORT"
+ beamSdkKey = "BEAM_SDK"
+ workingDirKey = "APP_WORK_DIR"
+ cacheTypeKey = "CACHE_TYPE"
+ cacheAddressKey = "CACHE_ADDRESS"
+ beamPathKey = "BEAM_PATH"
+ beamRunnerKey = "BEAM_RUNNER"
+ SLF4jKey = "SLF4J"
+ cacheKeyExpirationTimeKey = "KEY_EXPIRATION_TIME"
+ pipelineExecuteTimeoutKey = "PIPELINE_EXPIRATION_TIMEOUT"
+ defaultIp = "localhost"
+ defaultPort = 8080
+ defaultSdk = pb.Sdk_SDK_JAVA
+ defaultBeamSdkPath = "/opt/apache/beam/jars/beam-sdks-java-harness.jar"
+ defaultCacheType = "local"
+ defaultCacheAddress = "localhost:6379"
+ defaultCacheKeyExpirationTime = time.Minute * 15
+ defaultPipelineExecuteTimeout = time.Minute * 10
+ defaultBeamRunner = "/opt/apache/beam/jars/beam-runners-direct.jar"
+ defaultSLF4j = "/opt/apache/beam/jars/slf4j-jdk14.jar"
+ jsonExt = ".json"
+ configFolderName = "configs"
+)
+
+// Environment operates with environment structures: NetworkEnvs, BeamEnvs, ApplicationEnvs
+type Environment struct {
+ NetworkEnvs NetworkEnvs
+ BeamSdkEnvs BeamEnvs
+ ApplicationEnvs ApplicationEnvs
+}
+
+// NewEnvironment is a constructor for Environment.
+// Default values:
+// LogWriters: by default using os.Stdout
+// NetworkEnvs: by default using defaultIp and defaultPort from constants
+// BeamEnvs: by default using pb.Sdk_SDK_JAVA
+// ApplicationEnvs: required field not providing by default value
+func NewEnvironment(networkEnvs NetworkEnvs, beamEnvs BeamEnvs, appEnvs ApplicationEnvs) *Environment {
+ svc := Environment{}
+ svc.NetworkEnvs = networkEnvs
+ svc.BeamSdkEnvs = beamEnvs
+ svc.ApplicationEnvs = appEnvs
+
+ return &svc
+}
+
+//GetApplicationEnvsFromOsEnvs lookups in os environment variables and takes value for app working dir. If not exists - return error
+func GetApplicationEnvsFromOsEnvs() (*ApplicationEnvs, error) {
+ pipelineExecuteTimeout := defaultPipelineExecuteTimeout
+ cacheExpirationTime := defaultCacheKeyExpirationTime
+ cacheType := getEnv(cacheTypeKey, defaultCacheType)
+ cacheAddress := getEnv(cacheAddressKey, defaultCacheAddress)
+
+ if value, present := os.LookupEnv(cacheKeyExpirationTimeKey); present {
+ if converted, err := time.ParseDuration(value); err == nil {
+ cacheExpirationTime = converted
+ } else {
+ log.Printf("couldn't convert provided cache expiration time. Using default %s\n", defaultCacheKeyExpirationTime)
+ }
+ }
+ if value, present := os.LookupEnv(pipelineExecuteTimeoutKey); present {
+ if converted, err := time.ParseDuration(value); err == nil {
+ pipelineExecuteTimeout = converted
+ } else {
+ log.Printf("couldn't convert provided pipeline execute timeout. Using default %s\n", defaultPipelineExecuteTimeout)
+ }
+ }
+
+ if value, present := os.LookupEnv(workingDirKey); present {
+ return NewApplicationEnvs(value, NewCacheEnvs(cacheType, cacheAddress, cacheExpirationTime), pipelineExecuteTimeout), nil
+ }
+ return nil, errors.New("APP_WORK_DIR env should be provided with os.env")
+}
+
+// GetNetworkEnvsFromOsEnvs lookups in os environment variables and takes value for ip and port. If not exists - using default
+func GetNetworkEnvsFromOsEnvs() (*NetworkEnvs, error) {
+ ip := getEnv(serverIpKey, defaultIp)
+ port := defaultPort
+ var err error
+ if value, present := os.LookupEnv(serverPortKey); present {
+ port, err = strconv.Atoi(value)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return NewNetworkEnvs(ip, port), nil
+}
+
+// GetSdkEnvsFromOsEnvs lookups in os environment variables and takes value for Apache Beam SDK. If not exists - using default
+func GetSdkEnvsFromOsEnvs() (*BeamEnvs, error) {
+ sdk := pb.Sdk_SDK_UNSPECIFIED
+ if value, present := os.LookupEnv(beamSdkKey); present {
+
+ switch value {
+ case pb.Sdk_SDK_JAVA.String():
+ sdk = pb.Sdk_SDK_JAVA
+ case pb.Sdk_SDK_GO.String():
+ sdk = pb.Sdk_SDK_GO
+ case pb.Sdk_SDK_PYTHON.String():
+ sdk = pb.Sdk_SDK_PYTHON
+ case pb.Sdk_SDK_SCIO.String():
+ sdk = pb.Sdk_SDK_SCIO
+ }
+ }
+ if sdk == pb.Sdk_SDK_UNSPECIFIED {
+ return nil, errors.New("env BEAM_SDK must be specified in the environment variables")
+ }
+ configPath := filepath.Join(os.Getenv(workingDirKey), configFolderName, sdk.String()+jsonExt)
+ executorConfig, err := createExecutorConfig(sdk, configPath)
+ if err != nil {
+ return nil, err
+ }
+ return NewBeamEnvs(sdk, executorConfig), nil
+}
+
+//createExecutorConfig creates ExecutorConfig object that corresponds to specific apache beam sdk
+func createExecutorConfig(apacheBeamSdk pb.Sdk, configPath string) (*ExecutorConfig, error) {
+ executorConfig, err := getConfigFromJson(configPath)
+ if err != nil {
+ return nil, err
+ }
+ switch apacheBeamSdk {
+ case pb.Sdk_SDK_JAVA:
+ executorConfig.CompileArgs = append(executorConfig.CompileArgs, getEnv(beamPathKey, defaultBeamSdkPath))
+ jars := strings.Join([]string{
+ getEnv(beamPathKey, defaultBeamSdkPath),
+ getEnv(beamRunnerKey, defaultBeamRunner),
+ getEnv(SLF4jKey, defaultSLF4j),
+ }, ":")
+ executorConfig.RunArgs[1] += jars
+ case pb.Sdk_SDK_GO:
+ return nil, errors.New("not yet supported")
+ case pb.Sdk_SDK_PYTHON:
+ return nil, errors.New("not yet supported")
+ case pb.Sdk_SDK_SCIO:
+ return nil, errors.New("not yet supported")
+ }
+ return executorConfig, nil
+}
+
+//getConfigFromJson reads a json file to ExecutorConfig struct
+func getConfigFromJson(configPath string) (*ExecutorConfig, error) {
+ file, err := ioutil.ReadFile(configPath)
+ if err != nil {
+ return nil, err
+ }
+ executorConfig := ExecutorConfig{}
+ err = json.Unmarshal(file, &executorConfig)
+ if err != nil {
+ return nil, err
+ }
+ return &executorConfig, err
+}
+
+//getEnv returns a environment variable or default value
+func getEnv(key, defaultValue string) string {
+ if value, ok := os.LookupEnv(key); ok {
+ return value
+ }
+ return defaultValue
+}
diff --git a/playground/backend/internal/environment/environment_service_test.go b/playground/backend/internal/environment/environment_service_test.go
new file mode 100644
index 0000000..51644c9
--- /dev/null
+++ b/playground/backend/internal/environment/environment_service_test.go
@@ -0,0 +1,283 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+ playground "beam.apache.org/playground/backend/internal/api/v1"
+ "fmt"
+ "io/fs"
+ "os"
+ "path/filepath"
+ "reflect"
+ "strings"
+ "testing"
+)
+
+const (
+ javaConfig = "{\n \"compile_cmd\": \"javac\",\n \"run_cmd\": \"java\",\n \"compile_args\": [\"-d\", \"bin\", \"-classpath\"],\n \"run_args\": [\"-cp\", \"bin:\"]\n}"
+)
+
+func TestMain(m *testing.M) {
+ err := setup()
+ if err != nil {
+ fmt.Errorf("error during test setup: %s", err.Error())
+ }
+ defer teardown()
+ m.Run()
+}
+
+func setup() error {
+ err := os.MkdirAll(configFolderName, fs.ModePerm)
+ if err != nil {
+ return err
+ }
+ filePath := filepath.Join(configFolderName, defaultSdk.String()+jsonExt)
+ err = os.WriteFile(filePath, []byte(javaConfig), 0600)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func teardown() {
+ err := os.RemoveAll(configFolderName)
+ if err != nil {
+ fmt.Errorf("error during test setup: %s", err.Error())
+ }
+}
+
+func setOsEnvs(envsToSet map[string]string) error {
+ for key, value := range envsToSet {
+ if err := os.Setenv(key, value); err != nil {
+ return err
+ }
+
+ }
+ return nil
+}
+
+func TestNewEnvironment(t *testing.T) {
+ executorConfig := NewExecutorConfig("javac", "java", []string{""}, []string{""})
+ tests := []struct {
+ name string
+ want *Environment
+ }{
+ {name: "create env service with default envs", want: &Environment{
+ NetworkEnvs: *NewNetworkEnvs(defaultIp, defaultPort),
+ BeamSdkEnvs: *NewBeamEnvs(defaultSdk, executorConfig),
+ ApplicationEnvs: *NewApplicationEnvs("/app", &CacheEnvs{defaultCacheType, defaultCacheAddress, defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout),
+ }},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := NewEnvironment(
+ *NewNetworkEnvs(defaultIp, defaultPort),
+ *NewBeamEnvs(defaultSdk, executorConfig),
+ *NewApplicationEnvs("/app", &CacheEnvs{defaultCacheType, defaultCacheAddress, defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout)); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("NewEnvironment() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_getSdkEnvsFromOsEnvs(t *testing.T) {
+ jars := strings.Join([]string{defaultBeamSdkPath, defaultBeamRunner, defaultSLF4j}, ":")
+ tests := []struct {
+ name string
+ want *BeamEnvs
+ envsToSet map[string]string
+ wantErr bool
+ }{
+ {
+ name: "not specified beam sdk key in os envs",
+ want: nil,
+ envsToSet: map[string]string{workingDirKey: "./"},
+ wantErr: true,
+ },
+ {
+ name: "default beam envs",
+ want: NewBeamEnvs(defaultSdk, NewExecutorConfig("javac", "java", []string{"-d", "bin", "-classpath", defaultBeamSdkPath}, []string{"-cp", "bin:" + jars})),
+ envsToSet: map[string]string{beamSdkKey: "SDK_JAVA", workingDirKey: "./"},
+ wantErr: false,
+ },
+ {
+ name: "specific sdk key in os envs",
+ want: NewBeamEnvs(defaultSdk, NewExecutorConfig("javac", "java", []string{"-d", "bin", "-classpath", defaultBeamSdkPath}, []string{"-cp", "bin:" + jars})),
+ envsToSet: map[string]string{beamSdkKey: "SDK_JAVA", workingDirKey: "./"},
+ wantErr: false,
+ },
+ {
+ name: "wrong sdk key in os envs",
+ want: nil,
+ envsToSet: map[string]string{beamSdkKey: "SDK_J", workingDirKey: "./"},
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if err := setOsEnvs(tt.envsToSet); err != nil {
+ t.Fatalf("couldn't setup os env")
+ }
+ got, err := GetSdkEnvsFromOsEnvs()
+ if (err != nil) != tt.wantErr {
+ t.Errorf("getSdkEnvsFromOsEnvs() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("getSdkEnvsFromOsEnvs() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_getNetworkEnvsFromOsEnvs(t *testing.T) {
+ tests := []struct {
+ name string
+ want *NetworkEnvs
+ envsToSet map[string]string
+ wantErr bool
+ }{
+ {
+ name: "default values",
+ want: NewNetworkEnvs(defaultIp, defaultPort),
+ },
+ {
+ name: "values from os envs",
+ want: NewNetworkEnvs("12.12.12.21", 1234),
+ envsToSet: map[string]string{serverIpKey: "12.12.12.21", serverPortKey: "1234"},
+ },
+ {
+ name: "not int port in os env, should be default",
+ want: nil,
+ envsToSet: map[string]string{serverIpKey: "12.12.12.21", serverPortKey: "1a34"},
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if err := setOsEnvs(tt.envsToSet); err != nil {
+ t.Fatalf("couldn't setup os env")
+ }
+ got, err := GetNetworkEnvsFromOsEnvs()
+ if (err != nil) != tt.wantErr {
+ t.Errorf("getNetworkEnvsFromOsEnvs() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("getNetworkEnvsFromOsEnvs() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_getApplicationEnvsFromOsEnvs(t *testing.T) {
+ tests := []struct {
+ name string
+ want *ApplicationEnvs
+ wantErr bool
+ envsToSet map[string]string
+ }{
+ {name: "working dir is provided", want: NewApplicationEnvs("/app", &CacheEnvs{defaultCacheType, defaultCacheAddress, defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout), wantErr: false, envsToSet: map[string]string{workingDirKey: "/app"}},
+ {name: "working dir isn't provided", want: nil, wantErr: true},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if err := setOsEnvs(tt.envsToSet); err != nil {
+ t.Fatalf("couldn't setup os env")
+ }
+ got, err := GetApplicationEnvsFromOsEnvs()
+ if (err != nil) != tt.wantErr {
+ t.Errorf("getApplicationEnvsFromOsEnvs() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("getApplicationEnvsFromOsEnvs() got = %v, want %v", got, tt.want)
+ }
+ os.Clearenv()
+ })
+ }
+}
+
+func Test_createExecutorConfig(t *testing.T) {
+ jars := strings.Join([]string{defaultBeamSdkPath, defaultBeamRunner, defaultSLF4j}, ":")
+ type args struct {
+ apacheBeamSdk playground.Sdk
+ configPath string
+ }
+ tests := []struct {
+ name string
+ args args
+ want *ExecutorConfig
+ wantErr bool
+ }{
+ {
+ name: "create executor configuration from json file",
+ args: args{apacheBeamSdk: defaultSdk, configPath: filepath.Join(configFolderName, defaultSdk.String()+jsonExt)},
+ want: NewExecutorConfig("javac", "java", []string{"-d", "bin", "-classpath", defaultBeamSdkPath}, []string{"-cp", "bin:" + jars}),
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := createExecutorConfig(tt.args.apacheBeamSdk, tt.args.configPath)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("createExecutorConfig() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("createExecutorConfig() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_getConfigFromJson(t *testing.T) {
+ type args struct {
+ configPath string
+ }
+ tests := []struct {
+ name string
+ args args
+ want *ExecutorConfig
+ wantErr bool
+ }{
+ {
+ name: "get object from json",
+ args: args{filepath.Join(configFolderName, defaultSdk.String()+jsonExt)},
+ want: NewExecutorConfig("javac", "java", []string{"-d", "bin", "-classpath"}, []string{"-cp", "bin:"}),
+ wantErr: false,
+ },
+ {
+ name: "error if wrong json path",
+ args: args{filepath.Join("wrong_folder", defaultSdk.String()+jsonExt)},
+ want: nil,
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := getConfigFromJson(tt.args.configPath)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("getConfigFromJson() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("getConfigFromJson() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}