You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2021/10/25 18:24:02 UTC

[beam] branch master updated: Add environment_service.go and structures for beam sdk, network envs, cache and application envs

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aa26b53  Add environment_service.go and structures for beam sdk, network envs, cache and application envs
     new 835d19a  Merge pull request #15654 from [BEAM-12998] [Playground] Environment service
aa26b53 is described below

commit aa26b53f633847947c535b48b574cd652a1a0c3f
Author: Ilya Kozyrev <il...@akvelon.com>
AuthorDate: Mon Oct 25 18:33:34 2021 +0300

    Add environment_service.go and structures for beam sdk, network envs, cache and application envs
    
    Co-authored-by: AydarZaynutdinov <ay...@akvelon.com>
    Co-authored-by: daria.malkova <da...@akvelon.com>
---
 playground/backend/cmd/server/http.go              |   2 +-
 playground/backend/cmd/server/server.go            |  23 +-
 playground/backend/configs/SDK_JAVA.json           |  13 +
 .../backend/internal/environment/application.go    |  98 +++++++
 .../internal/environment/application_test.go       | 261 +++++++++++++++++++
 playground/backend/internal/environment/beam.go    |  43 ++++
 .../backend/internal/environment/environment.go    |  28 --
 .../internal/environment/environment_service.go    | 193 ++++++++++++++
 .../environment/environment_service_test.go        | 283 +++++++++++++++++++++
 9 files changed, 913 insertions(+), 31 deletions(-)

diff --git a/playground/backend/cmd/server/http.go b/playground/backend/cmd/server/http.go
index 8dce878..d49cbfe 100644
--- a/playground/backend/cmd/server/http.go
+++ b/playground/backend/cmd/server/http.go
@@ -22,7 +22,7 @@ import (
 )
 
 // listenHttp binds the http.Handler on the TCP network address
-func listenHttp(ctx context.Context, errChan chan error, envs environment.ServerEnvs, handler http.Handler) {
+func listenHttp(ctx context.Context, errChan chan error, envs environment.NetworkEnvs, handler http.Handler) {
 	grpclog.Infof("listening HTTP at %s\n", envs.Address())
 	if err := http.ListenAndServe(envs.Address(), handler); err != nil {
 		errChan <- err
diff --git a/playground/backend/cmd/server/server.go b/playground/backend/cmd/server/server.go
index 3102188..ea9edfc 100644
--- a/playground/backend/cmd/server/server.go
+++ b/playground/backend/cmd/server/server.go
@@ -32,7 +32,10 @@ func runServer() error {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	envService := environment.NewEnvironment()
+	envService, err := setupEnvironment()
+	if err != nil {
+		return err
+	}
 	grpcServer := grpc.NewServer()
 	pb.RegisterPlaygroundServiceServer(grpcServer, &playgroundController{})
 
@@ -40,7 +43,7 @@ func runServer() error {
 	handler := Wrap(grpcServer, getGrpcWebOptions())
 	errChan := make(chan error)
 
-	go listenHttp(ctx, errChan, envService, handler)
+	go listenHttp(ctx, errChan, envService.NetworkEnvs, handler)
 
 	for {
 		select {
@@ -53,6 +56,22 @@ func runServer() error {
 	}
 }
 
+func setupEnvironment() (*environment.Environment, error) {
+	networkEnvs, err := environment.GetNetworkEnvsFromOsEnvs()
+	if err != nil {
+		return nil, err
+	}
+	beamEnvs, err := environment.GetSdkEnvsFromOsEnvs()
+	if err != nil {
+		return nil, err
+	}
+	appEnvs, err := environment.GetApplicationEnvsFromOsEnvs()
+	if err != nil {
+		return nil, err
+	}
+	return environment.NewEnvironment(*networkEnvs, *beamEnvs, *appEnvs), nil
+}
+
 // getGrpcWebOptions returns grpcweb options needed to configure wrapper
 func getGrpcWebOptions() []grpcweb.Option {
 	return []grpcweb.Option{
diff --git a/playground/backend/configs/SDK_JAVA.json b/playground/backend/configs/SDK_JAVA.json
new file mode 100644
index 0000000..f371b12
--- /dev/null
+++ b/playground/backend/configs/SDK_JAVA.json
@@ -0,0 +1,13 @@
+{
+  "compile_cmd": "javac",
+  "run_cmd": "java",
+  "compile_args": [
+    "-d",
+    "bin",
+    "-classpath"
+  ],
+  "run_args": [
+    "-cp",
+    "bin:"
+  ]
+}
\ No newline at end of file
diff --git a/playground/backend/internal/environment/application.go b/playground/backend/internal/environment/application.go
new file mode 100644
index 0000000..39c7753
--- /dev/null
+++ b/playground/backend/internal/environment/application.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+	"fmt"
+	"time"
+)
+
+// NetworkEnvs contains all environment variables that need to run server.
+type NetworkEnvs struct {
+	ip   string
+	port int
+}
+
+// NewNetworkEnvs constructor for NetworkEnvs
+func NewNetworkEnvs(ip string, port int) *NetworkEnvs {
+	return &NetworkEnvs{ip: ip, port: port}
+}
+
+// Address returns concatenated ip and port through ':'
+func (serverEnvs NetworkEnvs) Address() string {
+	return fmt.Sprintf("%s:%d", serverEnvs.ip, serverEnvs.port)
+}
+
+//CacheEnvs contains all environment variables that needed to use cache
+type CacheEnvs struct {
+	cacheType         string
+	address           string
+	keyExpirationTime time.Duration
+}
+
+// CacheType returns cache type
+func (ce *CacheEnvs) CacheType() string {
+	return ce.cacheType
+}
+
+// Address returns address to connect to remote cache service
+func (ce *CacheEnvs) Address() string {
+	return ce.address
+}
+
+// KeyExpirationTime returns cacheExpirationTime
+func (ce *CacheEnvs) KeyExpirationTime() time.Duration {
+	return ce.keyExpirationTime
+}
+
+// NewCacheEnvs constructor for CacheEnvs
+func NewCacheEnvs(cacheType, cacheAddress string, cacheExpirationTime time.Duration) *CacheEnvs {
+	return &CacheEnvs{
+		cacheType:         cacheType,
+		address:           cacheAddress,
+		keyExpirationTime: cacheExpirationTime,
+	}
+}
+
+//ApplicationEnvs contains all environment variables that needed to run backend processes
+type ApplicationEnvs struct {
+	workingDir             string
+	cacheEnvs              *CacheEnvs
+	pipelineExecuteTimeout time.Duration
+}
+
+// NewApplicationEnvs constructor for ApplicationEnvs
+func NewApplicationEnvs(workingDir string, cacheEnvs *CacheEnvs, pipelineExecuteTimeout time.Duration) *ApplicationEnvs {
+	return &ApplicationEnvs{
+		workingDir:             workingDir,
+		cacheEnvs:              cacheEnvs,
+		pipelineExecuteTimeout: pipelineExecuteTimeout,
+	}
+}
+
+// WorkingDir returns workingDir
+func (ae *ApplicationEnvs) WorkingDir() string {
+	return ae.workingDir
+}
+
+func (ae *ApplicationEnvs) CacheEnvs() CacheEnvs {
+	return *ae.cacheEnvs
+}
+
+// PipelineExecuteTimeout returns pipelineExecuteTimeout
+func (ae *ApplicationEnvs) PipelineExecuteTimeout() time.Duration {
+	return ae.pipelineExecuteTimeout
+}
diff --git a/playground/backend/internal/environment/application_test.go b/playground/backend/internal/environment/application_test.go
new file mode 100644
index 0000000..78bca3f
--- /dev/null
+++ b/playground/backend/internal/environment/application_test.go
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+	"fmt"
+	"testing"
+	"time"
+)
+
+func TestNetworkEnvs_Address(t *testing.T) {
+	type fields struct {
+		ip   string
+		port int
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name:   "ip and port concatenated through ':'",
+			fields: fields{ip: defaultIp, port: defaultPort},
+			want:   fmt.Sprintf("%s:%d", defaultIp, defaultPort),
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			serverEnvs := NetworkEnvs{
+				ip:   tt.fields.ip,
+				port: tt.fields.port,
+			}
+			if got := serverEnvs.Address(); got != tt.want {
+				t.Errorf("Address() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestCacheEnvs_CacheType(t *testing.T) {
+	type fields struct {
+		cacheType         string
+		address           string
+		keyExpirationTime time.Duration
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "all success",
+			fields: fields{
+				cacheType:         "MOCK_CACHE_TYPE",
+				address:           "MOCK_ADDRESS",
+				keyExpirationTime: 0,
+			},
+			want: "MOCK_CACHE_TYPE",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			ce := &CacheEnvs{
+				cacheType:         tt.fields.cacheType,
+				address:           tt.fields.address,
+				keyExpirationTime: tt.fields.keyExpirationTime,
+			}
+			if got := ce.CacheType(); got != tt.want {
+				t.Errorf("CacheType() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestCacheEnvs_Address(t *testing.T) {
+	type fields struct {
+		cacheType         string
+		address           string
+		keyExpirationTime time.Duration
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "all success",
+			fields: fields{
+				cacheType:         "MOCK_CACHE_TYPE",
+				address:           "MOCK_ADDRESS",
+				keyExpirationTime: 0,
+			},
+			want: "MOCK_ADDRESS",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			ce := &CacheEnvs{
+				cacheType:         tt.fields.cacheType,
+				address:           tt.fields.address,
+				keyExpirationTime: tt.fields.keyExpirationTime,
+			}
+			if got := ce.Address(); got != tt.want {
+				t.Errorf("Address() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestCacheEnvs_KeyExpirationTime(t *testing.T) {
+	type fields struct {
+		cacheType         string
+		address           string
+		keyExpirationTime time.Duration
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   time.Duration
+	}{
+		{
+			name: "all success",
+			fields: fields{
+				cacheType:         "MOCK_CACHE_TYPE",
+				address:           "MOCK_ADDRESS",
+				keyExpirationTime: 0,
+			},
+			want: 0,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			ce := &CacheEnvs{
+				cacheType:         tt.fields.cacheType,
+				address:           tt.fields.address,
+				keyExpirationTime: tt.fields.keyExpirationTime,
+			}
+			if got := ce.KeyExpirationTime(); got != tt.want {
+				t.Errorf("KeyExpirationTime() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestApplicationEnvs_WorkingDir(t *testing.T) {
+	type fields struct {
+		workingDir             string
+		cacheEnvs              *CacheEnvs
+		pipelineExecuteTimeout time.Duration
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "all success",
+			fields: fields{
+				workingDir:             "MOCK_WORKING_DIR",
+				cacheEnvs:              &CacheEnvs{},
+				pipelineExecuteTimeout: 0,
+			},
+			want: "MOCK_WORKING_DIR",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			ae := &ApplicationEnvs{
+				workingDir:             tt.fields.workingDir,
+				cacheEnvs:              tt.fields.cacheEnvs,
+				pipelineExecuteTimeout: tt.fields.pipelineExecuteTimeout,
+			}
+			if got := ae.WorkingDir(); got != tt.want {
+				t.Errorf("WorkingDir() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestApplicationEnvs_CacheEnvs(t *testing.T) {
+	type fields struct {
+		workingDir             string
+		cacheEnvs              *CacheEnvs
+		pipelineExecuteTimeout time.Duration
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   CacheEnvs
+	}{
+		{
+			name: "all success",
+			fields: fields{
+				workingDir:             "MOCK_WORKING_DIR",
+				cacheEnvs:              &CacheEnvs{},
+				pipelineExecuteTimeout: 0,
+			},
+			want: CacheEnvs{},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			ae := &ApplicationEnvs{
+				workingDir:             tt.fields.workingDir,
+				cacheEnvs:              tt.fields.cacheEnvs,
+				pipelineExecuteTimeout: tt.fields.pipelineExecuteTimeout,
+			}
+			if got := ae.CacheEnvs(); got != tt.want {
+				t.Errorf("CacheEnvs() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestApplicationEnvs_PipelineExecuteTimeout(t *testing.T) {
+	type fields struct {
+		workingDir             string
+		cacheEnvs              *CacheEnvs
+		pipelineExecuteTimeout time.Duration
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   time.Duration
+	}{
+		{
+			name: "all success",
+			fields: fields{
+				workingDir:             "MOCK_WORKING_DIR",
+				cacheEnvs:              &CacheEnvs{},
+				pipelineExecuteTimeout: 0,
+			},
+			want: 0,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			ae := &ApplicationEnvs{
+				workingDir:             tt.fields.workingDir,
+				cacheEnvs:              tt.fields.cacheEnvs,
+				pipelineExecuteTimeout: tt.fields.pipelineExecuteTimeout,
+			}
+			if got := ae.PipelineExecuteTimeout(); got != tt.want {
+				t.Errorf("PipelineExecuteTimeout() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
diff --git a/playground/backend/internal/environment/beam.go b/playground/backend/internal/environment/beam.go
new file mode 100644
index 0000000..d266d0c
--- /dev/null
+++ b/playground/backend/internal/environment/beam.go
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+	pb "beam.apache.org/playground/backend/internal/api/v1"
+)
+
+// ExecutorConfig contains needed for compiling and execution of the code commands
+type ExecutorConfig struct {
+	CompileCmd  string   `json:"compile_cmd"`
+	RunCmd      string   `json:"run_cmd"`
+	CompileArgs []string `json:"compile_args"`
+	RunArgs     []string `json:"run_args"`
+}
+
+func NewExecutorConfig(compileCmd string, runCmd string, compileArgs []string, runArgs []string) *ExecutorConfig {
+	return &ExecutorConfig{CompileCmd: compileCmd, RunCmd: runCmd, CompileArgs: compileArgs, RunArgs: runArgs}
+}
+
+// BeamEnvs contains all environments related of ApacheBeam. These will use to run pipelines
+type BeamEnvs struct {
+	ApacheBeamSdk  pb.Sdk
+	ExecutorConfig *ExecutorConfig
+}
+
+// NewBeamEnvs is a BeamEnvs constructor
+func NewBeamEnvs(apacheBeamSdk pb.Sdk, executorConfig *ExecutorConfig) *BeamEnvs {
+	return &BeamEnvs{ApacheBeamSdk: apacheBeamSdk, ExecutorConfig: executorConfig}
+}
diff --git a/playground/backend/internal/environment/environment.go b/playground/backend/internal/environment/environment.go
deleted file mode 100644
index 910dec9..0000000
--- a/playground/backend/internal/environment/environment.go
+++ /dev/null
@@ -1,28 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// TODO: remove this code when merging https://github.com/apache/beam/pull/15654
-
-package environment
-
-type ServerEnvs struct {}
-
-func (envs ServerEnvs) Address() string {
-	return ""
-}
-
-func NewEnvironment() ServerEnvs {
-	return ServerEnvs{}
-}
diff --git a/playground/backend/internal/environment/environment_service.go b/playground/backend/internal/environment/environment_service.go
new file mode 100644
index 0000000..240a972
--- /dev/null
+++ b/playground/backend/internal/environment/environment_service.go
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+	pb "beam.apache.org/playground/backend/internal/api/v1"
+	"encoding/json"
+	"errors"
+	"io/ioutil"
+	"log"
+	"os"
+	"path/filepath"
+	"strconv"
+	"strings"
+	"time"
+)
+
+const (
+	serverIpKey                   = "SERVER_IP"
+	serverPortKey                 = "SERVER_PORT"
+	beamSdkKey                    = "BEAM_SDK"
+	workingDirKey                 = "APP_WORK_DIR"
+	cacheTypeKey                  = "CACHE_TYPE"
+	cacheAddressKey               = "CACHE_ADDRESS"
+	beamPathKey                   = "BEAM_PATH"
+	beamRunnerKey                 = "BEAM_RUNNER"
+	SLF4jKey                      = "SLF4J"
+	cacheKeyExpirationTimeKey     = "KEY_EXPIRATION_TIME"
+	pipelineExecuteTimeoutKey     = "PIPELINE_EXPIRATION_TIMEOUT"
+	defaultIp                     = "localhost"
+	defaultPort                   = 8080
+	defaultSdk                    = pb.Sdk_SDK_JAVA
+	defaultBeamSdkPath            = "/opt/apache/beam/jars/beam-sdks-java-harness.jar"
+	defaultCacheType              = "local"
+	defaultCacheAddress           = "localhost:6379"
+	defaultCacheKeyExpirationTime = time.Minute * 15
+	defaultPipelineExecuteTimeout = time.Minute * 10
+	defaultBeamRunner             = "/opt/apache/beam/jars/beam-runners-direct.jar"
+	defaultSLF4j                  = "/opt/apache/beam/jars/slf4j-jdk14.jar"
+	jsonExt                       = ".json"
+	configFolderName              = "configs"
+)
+
+// Environment operates with environment structures: NetworkEnvs, BeamEnvs, ApplicationEnvs
+type Environment struct {
+	NetworkEnvs     NetworkEnvs
+	BeamSdkEnvs     BeamEnvs
+	ApplicationEnvs ApplicationEnvs
+}
+
+// NewEnvironment is a constructor for Environment.
+// Default values:
+// LogWriters: by default using os.Stdout
+// NetworkEnvs: by default using defaultIp and defaultPort from constants
+// BeamEnvs: by default using pb.Sdk_SDK_JAVA
+// ApplicationEnvs: required field not providing by default value
+func NewEnvironment(networkEnvs NetworkEnvs, beamEnvs BeamEnvs, appEnvs ApplicationEnvs) *Environment {
+	svc := Environment{}
+	svc.NetworkEnvs = networkEnvs
+	svc.BeamSdkEnvs = beamEnvs
+	svc.ApplicationEnvs = appEnvs
+
+	return &svc
+}
+
+//GetApplicationEnvsFromOsEnvs lookups in os environment variables and takes value for app working dir. If not exists - return error
+func GetApplicationEnvsFromOsEnvs() (*ApplicationEnvs, error) {
+	pipelineExecuteTimeout := defaultPipelineExecuteTimeout
+	cacheExpirationTime := defaultCacheKeyExpirationTime
+	cacheType := getEnv(cacheTypeKey, defaultCacheType)
+	cacheAddress := getEnv(cacheAddressKey, defaultCacheAddress)
+
+	if value, present := os.LookupEnv(cacheKeyExpirationTimeKey); present {
+		if converted, err := time.ParseDuration(value); err == nil {
+			cacheExpirationTime = converted
+		} else {
+			log.Printf("couldn't convert provided cache expiration time. Using default %s\n", defaultCacheKeyExpirationTime)
+		}
+	}
+	if value, present := os.LookupEnv(pipelineExecuteTimeoutKey); present {
+		if converted, err := time.ParseDuration(value); err == nil {
+			pipelineExecuteTimeout = converted
+		} else {
+			log.Printf("couldn't convert provided pipeline execute timeout. Using default %s\n", defaultPipelineExecuteTimeout)
+		}
+	}
+
+	if value, present := os.LookupEnv(workingDirKey); present {
+		return NewApplicationEnvs(value, NewCacheEnvs(cacheType, cacheAddress, cacheExpirationTime), pipelineExecuteTimeout), nil
+	}
+	return nil, errors.New("APP_WORK_DIR env should be provided with os.env")
+}
+
+// GetNetworkEnvsFromOsEnvs lookups in os environment variables and takes value for ip and port. If not exists - using default
+func GetNetworkEnvsFromOsEnvs() (*NetworkEnvs, error) {
+	ip := getEnv(serverIpKey, defaultIp)
+	port := defaultPort
+	var err error
+	if value, present := os.LookupEnv(serverPortKey); present {
+		port, err = strconv.Atoi(value)
+		if err != nil {
+			return nil, err
+		}
+	}
+	return NewNetworkEnvs(ip, port), nil
+}
+
+// GetSdkEnvsFromOsEnvs lookups in os environment variables and takes value for Apache Beam SDK. If not exists - using default
+func GetSdkEnvsFromOsEnvs() (*BeamEnvs, error) {
+	sdk := pb.Sdk_SDK_UNSPECIFIED
+	if value, present := os.LookupEnv(beamSdkKey); present {
+
+		switch value {
+		case pb.Sdk_SDK_JAVA.String():
+			sdk = pb.Sdk_SDK_JAVA
+		case pb.Sdk_SDK_GO.String():
+			sdk = pb.Sdk_SDK_GO
+		case pb.Sdk_SDK_PYTHON.String():
+			sdk = pb.Sdk_SDK_PYTHON
+		case pb.Sdk_SDK_SCIO.String():
+			sdk = pb.Sdk_SDK_SCIO
+		}
+	}
+	if sdk == pb.Sdk_SDK_UNSPECIFIED {
+		return nil, errors.New("env BEAM_SDK must be specified in the environment variables")
+	}
+	configPath := filepath.Join(os.Getenv(workingDirKey), configFolderName, sdk.String()+jsonExt)
+	executorConfig, err := createExecutorConfig(sdk, configPath)
+	if err != nil {
+		return nil, err
+	}
+	return NewBeamEnvs(sdk, executorConfig), nil
+}
+
+//createExecutorConfig creates ExecutorConfig object that corresponds to specific apache beam sdk
+func createExecutorConfig(apacheBeamSdk pb.Sdk, configPath string) (*ExecutorConfig, error) {
+	executorConfig, err := getConfigFromJson(configPath)
+	if err != nil {
+		return nil, err
+	}
+	switch apacheBeamSdk {
+	case pb.Sdk_SDK_JAVA:
+		executorConfig.CompileArgs = append(executorConfig.CompileArgs, getEnv(beamPathKey, defaultBeamSdkPath))
+		jars := strings.Join([]string{
+			getEnv(beamPathKey, defaultBeamSdkPath),
+			getEnv(beamRunnerKey, defaultBeamRunner),
+			getEnv(SLF4jKey, defaultSLF4j),
+		}, ":")
+		executorConfig.RunArgs[1] += jars
+	case pb.Sdk_SDK_GO:
+		return nil, errors.New("not yet supported")
+	case pb.Sdk_SDK_PYTHON:
+		return nil, errors.New("not yet supported")
+	case pb.Sdk_SDK_SCIO:
+		return nil, errors.New("not yet supported")
+	}
+	return executorConfig, nil
+}
+
+//getConfigFromJson reads a json file to ExecutorConfig struct
+func getConfigFromJson(configPath string) (*ExecutorConfig, error) {
+	file, err := ioutil.ReadFile(configPath)
+	if err != nil {
+		return nil, err
+	}
+	executorConfig := ExecutorConfig{}
+	err = json.Unmarshal(file, &executorConfig)
+	if err != nil {
+		return nil, err
+	}
+	return &executorConfig, err
+}
+
+//getEnv returns a environment variable or default value
+func getEnv(key, defaultValue string) string {
+	if value, ok := os.LookupEnv(key); ok {
+		return value
+	}
+	return defaultValue
+}
diff --git a/playground/backend/internal/environment/environment_service_test.go b/playground/backend/internal/environment/environment_service_test.go
new file mode 100644
index 0000000..51644c9
--- /dev/null
+++ b/playground/backend/internal/environment/environment_service_test.go
@@ -0,0 +1,283 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+	playground "beam.apache.org/playground/backend/internal/api/v1"
+	"fmt"
+	"io/fs"
+	"os"
+	"path/filepath"
+	"reflect"
+	"strings"
+	"testing"
+)
+
+const (
+	javaConfig = "{\n  \"compile_cmd\": \"javac\",\n  \"run_cmd\": \"java\",\n  \"compile_args\": [\"-d\", \"bin\", \"-classpath\"],\n  \"run_args\": [\"-cp\", \"bin:\"]\n}"
+)
+
+func TestMain(m *testing.M) {
+	err := setup()
+	if err != nil {
+		fmt.Errorf("error during test setup: %s", err.Error())
+	}
+	defer teardown()
+	m.Run()
+}
+
+func setup() error {
+	err := os.MkdirAll(configFolderName, fs.ModePerm)
+	if err != nil {
+		return err
+	}
+	filePath := filepath.Join(configFolderName, defaultSdk.String()+jsonExt)
+	err = os.WriteFile(filePath, []byte(javaConfig), 0600)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func teardown() {
+	err := os.RemoveAll(configFolderName)
+	if err != nil {
+		fmt.Errorf("error during test setup: %s", err.Error())
+	}
+}
+
+func setOsEnvs(envsToSet map[string]string) error {
+	for key, value := range envsToSet {
+		if err := os.Setenv(key, value); err != nil {
+			return err
+		}
+
+	}
+	return nil
+}
+
+func TestNewEnvironment(t *testing.T) {
+	executorConfig := NewExecutorConfig("javac", "java", []string{""}, []string{""})
+	tests := []struct {
+		name string
+		want *Environment
+	}{
+		{name: "create env service with default envs", want: &Environment{
+			NetworkEnvs:     *NewNetworkEnvs(defaultIp, defaultPort),
+			BeamSdkEnvs:     *NewBeamEnvs(defaultSdk, executorConfig),
+			ApplicationEnvs: *NewApplicationEnvs("/app", &CacheEnvs{defaultCacheType, defaultCacheAddress, defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout),
+		}},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := NewEnvironment(
+				*NewNetworkEnvs(defaultIp, defaultPort),
+				*NewBeamEnvs(defaultSdk, executorConfig),
+				*NewApplicationEnvs("/app", &CacheEnvs{defaultCacheType, defaultCacheAddress, defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout)); !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("NewEnvironment() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_getSdkEnvsFromOsEnvs(t *testing.T) {
+	jars := strings.Join([]string{defaultBeamSdkPath, defaultBeamRunner, defaultSLF4j}, ":")
+	tests := []struct {
+		name      string
+		want      *BeamEnvs
+		envsToSet map[string]string
+		wantErr   bool
+	}{
+		{
+			name:      "not specified beam sdk key in os envs",
+			want:      nil,
+			envsToSet: map[string]string{workingDirKey: "./"},
+			wantErr:   true,
+		},
+		{
+			name:      "default beam envs",
+			want:      NewBeamEnvs(defaultSdk, NewExecutorConfig("javac", "java", []string{"-d", "bin", "-classpath", defaultBeamSdkPath}, []string{"-cp", "bin:" + jars})),
+			envsToSet: map[string]string{beamSdkKey: "SDK_JAVA", workingDirKey: "./"},
+			wantErr:   false,
+		},
+		{
+			name:      "specific sdk key in os envs",
+			want:      NewBeamEnvs(defaultSdk, NewExecutorConfig("javac", "java", []string{"-d", "bin", "-classpath", defaultBeamSdkPath}, []string{"-cp", "bin:" + jars})),
+			envsToSet: map[string]string{beamSdkKey: "SDK_JAVA", workingDirKey: "./"},
+			wantErr:   false,
+		},
+		{
+			name:      "wrong sdk key in os envs",
+			want:      nil,
+			envsToSet: map[string]string{beamSdkKey: "SDK_J", workingDirKey: "./"},
+			wantErr:   true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if err := setOsEnvs(tt.envsToSet); err != nil {
+				t.Fatalf("couldn't setup os env")
+			}
+			got, err := GetSdkEnvsFromOsEnvs()
+			if (err != nil) != tt.wantErr {
+				t.Errorf("getSdkEnvsFromOsEnvs() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("getSdkEnvsFromOsEnvs() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_getNetworkEnvsFromOsEnvs(t *testing.T) {
+	tests := []struct {
+		name      string
+		want      *NetworkEnvs
+		envsToSet map[string]string
+		wantErr   bool
+	}{
+		{
+			name: "default values",
+			want: NewNetworkEnvs(defaultIp, defaultPort),
+		},
+		{
+			name:      "values from os envs",
+			want:      NewNetworkEnvs("12.12.12.21", 1234),
+			envsToSet: map[string]string{serverIpKey: "12.12.12.21", serverPortKey: "1234"},
+		},
+		{
+			name:      "not int port in os env, should be default",
+			want:      nil,
+			envsToSet: map[string]string{serverIpKey: "12.12.12.21", serverPortKey: "1a34"},
+			wantErr:   true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if err := setOsEnvs(tt.envsToSet); err != nil {
+				t.Fatalf("couldn't setup os env")
+			}
+			got, err := GetNetworkEnvsFromOsEnvs()
+			if (err != nil) != tt.wantErr {
+				t.Errorf("getNetworkEnvsFromOsEnvs() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("getNetworkEnvsFromOsEnvs() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_getApplicationEnvsFromOsEnvs(t *testing.T) {
+	tests := []struct {
+		name      string
+		want      *ApplicationEnvs
+		wantErr   bool
+		envsToSet map[string]string
+	}{
+		{name: "working dir is provided", want: NewApplicationEnvs("/app", &CacheEnvs{defaultCacheType, defaultCacheAddress, defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout), wantErr: false, envsToSet: map[string]string{workingDirKey: "/app"}},
+		{name: "working dir isn't provided", want: nil, wantErr: true},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if err := setOsEnvs(tt.envsToSet); err != nil {
+				t.Fatalf("couldn't setup os env")
+			}
+			got, err := GetApplicationEnvsFromOsEnvs()
+			if (err != nil) != tt.wantErr {
+				t.Errorf("getApplicationEnvsFromOsEnvs() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("getApplicationEnvsFromOsEnvs() got = %v, want %v", got, tt.want)
+			}
+			os.Clearenv()
+		})
+	}
+}
+
+func Test_createExecutorConfig(t *testing.T) {
+	jars := strings.Join([]string{defaultBeamSdkPath, defaultBeamRunner, defaultSLF4j}, ":")
+	type args struct {
+		apacheBeamSdk playground.Sdk
+		configPath    string
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    *ExecutorConfig
+		wantErr bool
+	}{
+		{
+			name:    "create executor configuration from json file",
+			args:    args{apacheBeamSdk: defaultSdk, configPath: filepath.Join(configFolderName, defaultSdk.String()+jsonExt)},
+			want:    NewExecutorConfig("javac", "java", []string{"-d", "bin", "-classpath", defaultBeamSdkPath}, []string{"-cp", "bin:" + jars}),
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := createExecutorConfig(tt.args.apacheBeamSdk, tt.args.configPath)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("createExecutorConfig() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("createExecutorConfig() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_getConfigFromJson(t *testing.T) {
+	type args struct {
+		configPath string
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    *ExecutorConfig
+		wantErr bool
+	}{
+		{
+			name:    "get object from json",
+			args:    args{filepath.Join(configFolderName, defaultSdk.String()+jsonExt)},
+			want:    NewExecutorConfig("javac", "java", []string{"-d", "bin", "-classpath"}, []string{"-cp", "bin:"}),
+			wantErr: false,
+		},
+		{
+			name:    "error if wrong json path",
+			args:    args{filepath.Join("wrong_folder", defaultSdk.String()+jsonExt)},
+			want:    nil,
+			wantErr: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := getConfigFromJson(tt.args.configPath)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("getConfigFromJson() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("getConfigFromJson() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}