You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/05/26 16:44:12 UTC

[beam] branch master updated: [Playground] Log cancellation messages as warnings (#26790)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4aebbc0391c [Playground] Log cancellation messages as warnings (#26790)
4aebbc0391c is described below

commit 4aebbc0391c9e03e7bfdfd58f80986314c33ed18
Author: Timur Sultanov <ti...@akvelon.com>
AuthorDate: Fri May 26 20:44:04 2023 +0400

    [Playground] Log cancellation messages as warnings (#26790)
    
    * Log cancellation messages as warnings
    
    * Log compilation errors as warnings
---
 playground/backend/cmd/server/controller.go        |  4 +-
 .../internal/code_processing/code_processing.go    | 63 +++++++++++++++-------
 .../backend/internal/errors/lifecycle_error.go     | 32 +++++++++++
 3 files changed, 78 insertions(+), 21 deletions(-)

diff --git a/playground/backend/cmd/server/controller.go b/playground/backend/cmd/server/controller.go
index 4e379be2595..606e5fd4dcf 100644
--- a/playground/backend/cmd/server/controller.go
+++ b/playground/backend/cmd/server/controller.go
@@ -570,7 +570,7 @@ func (controller *playgroundController) GetSnippet(ctx context.Context, info *pb
 func (controller *playgroundController) GetMetadata(_ context.Context, _ *pb.GetMetadataRequest) (*pb.GetMetadataResponse, error) {
 	commitTimestampInteger, err := strconv.ParseInt(BuildCommitTimestamp, 10, 64)
 	if err != nil {
-		logger.Errorf("GetMetadata(): failed to parse BuildCommitTimestamp (\"%s\"): %s", BuildCommitTimestamp, err.Error())
+		logger.Warnf("GetMetadata(): failed to parse BuildCommitTimestamp (\"%s\"): %s", BuildCommitTimestamp, err.Error())
 		commitTimestampInteger = 0
 	}
 
@@ -587,7 +587,7 @@ func (controller *playgroundController) GetMetadata(_ context.Context, _ *pb.Get
 // verifyRouter verifies that controller is configured to work in router mode
 func (controller *playgroundController) verifyRouter() error {
 	if controller.env.BeamSdkEnvs.ApacheBeamSdk != pb.Sdk_SDK_UNSPECIFIED {
-		return errors.New("runner mode")
+		return errors.New("server is in runner mode")
 	}
 	if controller.db == nil {
 		return errors.New("no database service")
diff --git a/playground/backend/internal/code_processing/code_processing.go b/playground/backend/internal/code_processing/code_processing.go
index d371860430e..06bc8335701 100644
--- a/playground/backend/internal/code_processing/code_processing.go
+++ b/playground/backend/internal/code_processing/code_processing.go
@@ -18,6 +18,7 @@ package code_processing
 import (
 	"bytes"
 	"context"
+	"errors"
 	"fmt"
 	"io"
 	"os"
@@ -31,7 +32,7 @@ import (
 	pb "beam.apache.org/playground/backend/internal/api/v1"
 	"beam.apache.org/playground/backend/internal/cache"
 	"beam.apache.org/playground/backend/internal/environment"
-	"beam.apache.org/playground/backend/internal/errors"
+	perrors "beam.apache.org/playground/backend/internal/errors"
 	"beam.apache.org/playground/backend/internal/executors"
 	"beam.apache.org/playground/backend/internal/fs_tool"
 	"beam.apache.org/playground/backend/internal/logger"
@@ -68,13 +69,23 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 
 	err := validateStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, &validationResults)
 	if err != nil {
-		logger.Errorf("%s: error during validation step: %s", pipelineId, err.Error())
+		var pipelineCanceledError perrors.PipelineCanceledError
+		if errors.As(err, &pipelineCanceledError) {
+			logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
+		} else {
+			logger.Errorf("%s: error during validation step: %s", pipelineId, err.Error())
+		}
 		return
 	}
 
 	err = prepareStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, &validationResults, lc.GetPreparerParameters())
 	if err != nil {
-		logger.Errorf("%s: error during preparation step: %s", pipelineId, err.Error())
+		var pipelineCanceledError perrors.PipelineCanceledError
+		if errors.As(err, &pipelineCanceledError) {
+			logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
+		} else {
+			logger.Errorf("%s: error during preparation step: %s", pipelineId, err.Error())
+		}
 		return
 	}
 
@@ -84,14 +95,28 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
 
 	err = compileStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, isUnitTest)
 	if err != nil {
-		logger.Errorf("%s: error during compilation step: %s", pipelineId, err.Error())
+		var pipelineCanceledError perrors.PipelineCanceledError
+		var compilationError perrors.CompilationError
+		if errors.As(err, &pipelineCanceledError) {
+			logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
+		} else if errors.As(err, &compilationError) {
+			logger.Warnf("%s: compilation error: %s", pipelineId, compilationError.Error())
+		} else {
+			logger.Errorf("%s: error during compilation step: %s", pipelineId, err.Error())
+		}
 		return
 	}
 
 	// Run/RunTest
 	err = runStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, isUnitTest, sdkEnv, pipelineOptions)
 	if err != nil {
-		logger.Errorf("%s: error during run step: %s", pipelineId, err.Error())
+		var pipelineCanceledError perrors.PipelineCanceledError
+		if errors.As(err, &pipelineCanceledError) {
+			logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
+		} else {
+			logger.Errorf("%s: error during run step: %s", pipelineId, err.Error())
+		}
+		return
 	}
 }
 
@@ -206,7 +231,7 @@ func compileStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.L
 			if processingErr != nil {
 				return processingErr
 			}
-			return err
+			return perrors.CompilationError{Reason: err.Error()}
 		} // Compile step is finished and code is compiled
 		if err := processCompileSuccess(compileOutput.Bytes(), pipelineId, cacheService); err != nil {
 			return err
@@ -320,12 +345,12 @@ func GetProcessingOutput(ctx context.Context, cacheService cache.Cache, key uuid
 	value, err := cacheService.GetValue(ctx, key, subKey)
 	if err != nil {
 		logger.Errorf("%s: GetProcessingOutput(): cache.GetValue: error: %s", key, err.Error())
-		return "", errors.NotFoundError(errorTitle, "Error during getting output")
+		return "", perrors.NotFoundError(errorTitle, "Error during getting output")
 	}
 	stringValue, converted := value.(string)
 	if !converted {
 		logger.Errorf("%s: couldn't convert value to string: %s", key, value)
-		return "", errors.InternalError(errorTitle, "Error during getting output")
+		return "", perrors.InternalError(errorTitle, "Error during getting output")
 	}
 	return stringValue, nil
 }
@@ -337,12 +362,12 @@ func GetProcessingStatus(ctx context.Context, cacheService cache.Cache, key uuid
 	value, err := cacheService.GetValue(ctx, key, cache.Status)
 	if err != nil {
 		logger.Errorf("%s: GetProcessingStatus(): cache.GetValue: error: %s", key, err.Error())
-		return pb.Status_STATUS_UNSPECIFIED, errors.NotFoundError(errorTitle, "Error during getting status")
+		return pb.Status_STATUS_UNSPECIFIED, perrors.NotFoundError(errorTitle, "Error during getting status")
 	}
 	statusValue, converted := value.(pb.Status)
 	if !converted {
 		logger.Errorf("%s: couldn't convert value to correct status enum: %s", key, value)
-		return pb.Status_STATUS_UNSPECIFIED, errors.InternalError(errorTitle, "Error during getting status")
+		return pb.Status_STATUS_UNSPECIFIED, perrors.InternalError(errorTitle, "Error during getting status")
 	}
 	return statusValue, nil
 }
@@ -354,12 +379,12 @@ func GetLastIndex(ctx context.Context, cacheService cache.Cache, key uuid.UUID,
 	value, err := cacheService.GetValue(ctx, key, subKey)
 	if err != nil {
 		logger.Errorf("%s: GetLastIndex(): cache.GetValue: error: %s", key, err.Error())
-		return 0, errors.NotFoundError(errorTitle, "Error during getting pagination value")
+		return 0, perrors.NotFoundError(errorTitle, "Error during getting pagination value")
 	}
 	convertedValue, converted := value.(float64)
 	if !converted {
 		logger.Errorf("%s: couldn't convert value to float64. value: %s type %s", key, value, reflect.TypeOf(value))
-		return 0, errors.InternalError(errorTitle, "Error during getting pagination value")
+		return 0, perrors.InternalError(errorTitle, "Error during getting pagination value")
 	}
 	return int(convertedValue), nil
 }
@@ -371,12 +396,12 @@ func GetGraph(ctx context.Context, cacheService cache.Cache, key uuid.UUID, erro
 	value, err := cacheService.GetValue(ctx, key, cache.Graph)
 	if err != nil {
 		logger.Errorf("%s: GetGraph(): cache.GetValue: error: %s", key, err.Error())
-		return "", errors.NotFoundError(errorTitle, "Error during getting graph")
+		return "", perrors.NotFoundError(errorTitle, "Error during getting graph")
 	}
 	stringValue, converted := value.(string)
 	if !converted {
 		logger.Errorf("%s: couldn't convert value to string. value: %s type %s", key, value, reflect.TypeOf(value))
-		return "", errors.InternalError(errorTitle, "Error during getting graph")
+		return "", perrors.InternalError(errorTitle, "Error during getting graph")
 	}
 	return stringValue, nil
 }
@@ -414,14 +439,14 @@ func reconcileBackgroundTask(pipelineLifeCycleCtx context.Context, pipelineId uu
 			if err := finishByTimeout(pipelineId, cacheService); err != nil {
 				return false, fmt.Errorf("error during context timeout processing: %s", err.Error())
 			}
-			return false, fmt.Errorf("code processing context timeout")
+			return false, perrors.PipelineCanceledError{Reason: fmt.Sprintf("code processing context timeout")}
 		case context.Canceled:
 			if err := processCancel(cacheService, pipelineId); err != nil {
 				return false, fmt.Errorf("error during cancellation processing: %s", err.Error())
 			}
-			return false, fmt.Errorf("code processing was canceled")
+			return false, perrors.PipelineCanceledError{Reason: "code processing was canceled"}
 		default:
-			return false, fmt.Errorf("code processing cancelled: %s", contextErr.Error())
+			return false, fmt.Errorf("code processing cancelled due to unexpected reason: %s", contextErr.Error())
 		}
 	case ok := <-successChannel:
 		return ok, nil
@@ -554,7 +579,7 @@ func DeleteResources(pipelineId uuid.UUID, lc *fs_tool.LifeCycle) {
 
 // finishByTimeout is used in case of runCode method finished by timeout
 func finishByTimeout(pipelineId uuid.UUID, cacheService cache.Cache) error {
-	logger.Errorf("%s: code processing finishes because of timeout\n", pipelineId)
+	logger.Warnf("%s: code processing finishes because of timeout\n", pipelineId)
 
 	// set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT
 	return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
@@ -562,7 +587,7 @@ func finishByTimeout(pipelineId uuid.UUID, cacheService cache.Cache) error {
 
 // processErrorWithSavingOutput processes error with saving to cache received error output.
 func processErrorWithSavingOutput(err error, errorOutput []byte, pipelineId uuid.UUID, subKey cache.SubKey, cacheService cache.Cache, errorTitle string, newStatus pb.Status) error {
-	logger.Errorf("%s: %s(): err: %s, output: %s\n", pipelineId, errorTitle, err.Error(), errorOutput)
+	logger.Warnf("%s: %s(): err: %s, output: %s\n", pipelineId, errorTitle, err.Error(), errorOutput)
 
 	if err := utils.SetToCache(cacheService, pipelineId, subKey, fmt.Sprintf("error: %s\noutput: %s", err.Error(), errorOutput)); err != nil {
 		logger.Errorf("%s: failed to save error message to cache: %s", pipelineId, err.Error())
diff --git a/playground/backend/internal/errors/lifecycle_error.go b/playground/backend/internal/errors/lifecycle_error.go
new file mode 100644
index 00000000000..a467bfd1a00
--- /dev/null
+++ b/playground/backend/internal/errors/lifecycle_error.go
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package errors
+
+type PipelineCanceledError struct {
+	Reason string
+}
+
+func (e PipelineCanceledError) Error() string {
+	return e.Reason
+}
+
+type CompilationError struct {
+	Reason string
+}
+
+func (e CompilationError) Error() string {
+	return e.Reason
+}