You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2023/05/26 16:44:12 UTC
[beam] branch master updated: [Playground] Log cancellation messages as warnings (#26790)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4aebbc0391c [Playground] Log cancellation messages as warnings (#26790)
4aebbc0391c is described below
commit 4aebbc0391c9e03e7bfdfd58f80986314c33ed18
Author: Timur Sultanov <ti...@akvelon.com>
AuthorDate: Fri May 26 20:44:04 2023 +0400
[Playground] Log cancellation messages as warnings (#26790)
* Log cancellation messages as warnings
* Log compilation errors as warnings
---
playground/backend/cmd/server/controller.go | 4 +-
.../internal/code_processing/code_processing.go | 63 +++++++++++++++-------
.../backend/internal/errors/lifecycle_error.go | 32 +++++++++++
3 files changed, 78 insertions(+), 21 deletions(-)
diff --git a/playground/backend/cmd/server/controller.go b/playground/backend/cmd/server/controller.go
index 4e379be2595..606e5fd4dcf 100644
--- a/playground/backend/cmd/server/controller.go
+++ b/playground/backend/cmd/server/controller.go
@@ -570,7 +570,7 @@ func (controller *playgroundController) GetSnippet(ctx context.Context, info *pb
func (controller *playgroundController) GetMetadata(_ context.Context, _ *pb.GetMetadataRequest) (*pb.GetMetadataResponse, error) {
commitTimestampInteger, err := strconv.ParseInt(BuildCommitTimestamp, 10, 64)
if err != nil {
- logger.Errorf("GetMetadata(): failed to parse BuildCommitTimestamp (\"%s\"): %s", BuildCommitTimestamp, err.Error())
+ logger.Warnf("GetMetadata(): failed to parse BuildCommitTimestamp (\"%s\"): %s", BuildCommitTimestamp, err.Error())
commitTimestampInteger = 0
}
@@ -587,7 +587,7 @@ func (controller *playgroundController) GetMetadata(_ context.Context, _ *pb.Get
// verifyRouter verifies that controller is configured to work in router mode
func (controller *playgroundController) verifyRouter() error {
if controller.env.BeamSdkEnvs.ApacheBeamSdk != pb.Sdk_SDK_UNSPECIFIED {
- return errors.New("runner mode")
+ return errors.New("server is in runner mode")
}
if controller.db == nil {
return errors.New("no database service")
diff --git a/playground/backend/internal/code_processing/code_processing.go b/playground/backend/internal/code_processing/code_processing.go
index d371860430e..06bc8335701 100644
--- a/playground/backend/internal/code_processing/code_processing.go
+++ b/playground/backend/internal/code_processing/code_processing.go
@@ -18,6 +18,7 @@ package code_processing
import (
"bytes"
"context"
+ "errors"
"fmt"
"io"
"os"
@@ -31,7 +32,7 @@ import (
pb "beam.apache.org/playground/backend/internal/api/v1"
"beam.apache.org/playground/backend/internal/cache"
"beam.apache.org/playground/backend/internal/environment"
- "beam.apache.org/playground/backend/internal/errors"
+ perrors "beam.apache.org/playground/backend/internal/errors"
"beam.apache.org/playground/backend/internal/executors"
"beam.apache.org/playground/backend/internal/fs_tool"
"beam.apache.org/playground/backend/internal/logger"
@@ -68,13 +69,23 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
err := validateStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, &validationResults)
if err != nil {
- logger.Errorf("%s: error during validation step: %s", pipelineId, err.Error())
+ var pipelineCanceledError perrors.PipelineCanceledError
+ if errors.As(err, &pipelineCanceledError) {
+ logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
+ } else {
+ logger.Errorf("%s: error during validation step: %s", pipelineId, err.Error())
+ }
return
}
err = prepareStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, &validationResults, lc.GetPreparerParameters())
if err != nil {
- logger.Errorf("%s: error during preparation step: %s", pipelineId, err.Error())
+ var pipelineCanceledError perrors.PipelineCanceledError
+ if errors.As(err, &pipelineCanceledError) {
+ logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
+ } else {
+ logger.Errorf("%s: error during preparation step: %s", pipelineId, err.Error())
+ }
return
}
@@ -84,14 +95,28 @@ func Process(ctx context.Context, cacheService cache.Cache, lc *fs_tool.LifeCycl
err = compileStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, sdkEnv, isUnitTest)
if err != nil {
- logger.Errorf("%s: error during compilation step: %s", pipelineId, err.Error())
+ var pipelineCanceledError perrors.PipelineCanceledError
+ var compilationError perrors.CompilationError
+ if errors.As(err, &pipelineCanceledError) {
+ logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
+ } else if errors.As(err, &compilationError) {
+ logger.Warnf("%s: compilation error: %s", pipelineId, compilationError.Error())
+ } else {
+ logger.Errorf("%s: error during compilation step: %s", pipelineId, err.Error())
+ }
return
}
// Run/RunTest
err = runStep(pipelineLifeCycleCtx, cacheService, &lc.Paths, pipelineId, isUnitTest, sdkEnv, pipelineOptions)
if err != nil {
- logger.Errorf("%s: error during run step: %s", pipelineId, err.Error())
+ var pipelineCanceledError perrors.PipelineCanceledError
+ if errors.As(err, &pipelineCanceledError) {
+ logger.Warnf("%s: pipeline execution has been canceled: %s", pipelineId, pipelineCanceledError.Error())
+ } else {
+ logger.Errorf("%s: error during run step: %s", pipelineId, err.Error())
+ }
+ return
}
}
@@ -206,7 +231,7 @@ func compileStep(ctx context.Context, cacheService cache.Cache, paths *fs_tool.L
if processingErr != nil {
return processingErr
}
- return err
+ return perrors.CompilationError{Reason: err.Error()}
} // Compile step is finished and code is compiled
if err := processCompileSuccess(compileOutput.Bytes(), pipelineId, cacheService); err != nil {
return err
@@ -320,12 +345,12 @@ func GetProcessingOutput(ctx context.Context, cacheService cache.Cache, key uuid
value, err := cacheService.GetValue(ctx, key, subKey)
if err != nil {
logger.Errorf("%s: GetProcessingOutput(): cache.GetValue: error: %s", key, err.Error())
- return "", errors.NotFoundError(errorTitle, "Error during getting output")
+ return "", perrors.NotFoundError(errorTitle, "Error during getting output")
}
stringValue, converted := value.(string)
if !converted {
logger.Errorf("%s: couldn't convert value to string: %s", key, value)
- return "", errors.InternalError(errorTitle, "Error during getting output")
+ return "", perrors.InternalError(errorTitle, "Error during getting output")
}
return stringValue, nil
}
@@ -337,12 +362,12 @@ func GetProcessingStatus(ctx context.Context, cacheService cache.Cache, key uuid
value, err := cacheService.GetValue(ctx, key, cache.Status)
if err != nil {
logger.Errorf("%s: GetProcessingStatus(): cache.GetValue: error: %s", key, err.Error())
- return pb.Status_STATUS_UNSPECIFIED, errors.NotFoundError(errorTitle, "Error during getting status")
+ return pb.Status_STATUS_UNSPECIFIED, perrors.NotFoundError(errorTitle, "Error during getting status")
}
statusValue, converted := value.(pb.Status)
if !converted {
logger.Errorf("%s: couldn't convert value to correct status enum: %s", key, value)
- return pb.Status_STATUS_UNSPECIFIED, errors.InternalError(errorTitle, "Error during getting status")
+ return pb.Status_STATUS_UNSPECIFIED, perrors.InternalError(errorTitle, "Error during getting status")
}
return statusValue, nil
}
@@ -354,12 +379,12 @@ func GetLastIndex(ctx context.Context, cacheService cache.Cache, key uuid.UUID,
value, err := cacheService.GetValue(ctx, key, subKey)
if err != nil {
logger.Errorf("%s: GetLastIndex(): cache.GetValue: error: %s", key, err.Error())
- return 0, errors.NotFoundError(errorTitle, "Error during getting pagination value")
+ return 0, perrors.NotFoundError(errorTitle, "Error during getting pagination value")
}
convertedValue, converted := value.(float64)
if !converted {
logger.Errorf("%s: couldn't convert value to float64. value: %s type %s", key, value, reflect.TypeOf(value))
- return 0, errors.InternalError(errorTitle, "Error during getting pagination value")
+ return 0, perrors.InternalError(errorTitle, "Error during getting pagination value")
}
return int(convertedValue), nil
}
@@ -371,12 +396,12 @@ func GetGraph(ctx context.Context, cacheService cache.Cache, key uuid.UUID, erro
value, err := cacheService.GetValue(ctx, key, cache.Graph)
if err != nil {
logger.Errorf("%s: GetGraph(): cache.GetValue: error: %s", key, err.Error())
- return "", errors.NotFoundError(errorTitle, "Error during getting graph")
+ return "", perrors.NotFoundError(errorTitle, "Error during getting graph")
}
stringValue, converted := value.(string)
if !converted {
logger.Errorf("%s: couldn't convert value to string. value: %s type %s", key, value, reflect.TypeOf(value))
- return "", errors.InternalError(errorTitle, "Error during getting graph")
+ return "", perrors.InternalError(errorTitle, "Error during getting graph")
}
return stringValue, nil
}
@@ -414,14 +439,14 @@ func reconcileBackgroundTask(pipelineLifeCycleCtx context.Context, pipelineId uu
if err := finishByTimeout(pipelineId, cacheService); err != nil {
return false, fmt.Errorf("error during context timeout processing: %s", err.Error())
}
- return false, fmt.Errorf("code processing context timeout")
+ return false, perrors.PipelineCanceledError{Reason: fmt.Sprintf("code processing context timeout")}
case context.Canceled:
if err := processCancel(cacheService, pipelineId); err != nil {
return false, fmt.Errorf("error during cancellation processing: %s", err.Error())
}
- return false, fmt.Errorf("code processing was canceled")
+ return false, perrors.PipelineCanceledError{Reason: "code processing was canceled"}
default:
- return false, fmt.Errorf("code processing cancelled: %s", contextErr.Error())
+ return false, fmt.Errorf("code processing cancelled due to unexpected reason: %s", contextErr.Error())
}
case ok := <-successChannel:
return ok, nil
@@ -554,7 +579,7 @@ func DeleteResources(pipelineId uuid.UUID, lc *fs_tool.LifeCycle) {
// finishByTimeout is used in case of runCode method finished by timeout
func finishByTimeout(pipelineId uuid.UUID, cacheService cache.Cache) error {
- logger.Errorf("%s: code processing finishes because of timeout\n", pipelineId)
+ logger.Warnf("%s: code processing finishes because of timeout\n", pipelineId)
// set to cache pipelineId: cache.SubKey_Status: Status_STATUS_RUN_TIMEOUT
return utils.SetToCache(cacheService, pipelineId, cache.Status, pb.Status_STATUS_RUN_TIMEOUT)
@@ -562,7 +587,7 @@ func finishByTimeout(pipelineId uuid.UUID, cacheService cache.Cache) error {
// processErrorWithSavingOutput processes error with saving to cache received error output.
func processErrorWithSavingOutput(err error, errorOutput []byte, pipelineId uuid.UUID, subKey cache.SubKey, cacheService cache.Cache, errorTitle string, newStatus pb.Status) error {
- logger.Errorf("%s: %s(): err: %s, output: %s\n", pipelineId, errorTitle, err.Error(), errorOutput)
+ logger.Warnf("%s: %s(): err: %s, output: %s\n", pipelineId, errorTitle, err.Error(), errorOutput)
if err := utils.SetToCache(cacheService, pipelineId, subKey, fmt.Sprintf("error: %s\noutput: %s", err.Error(), errorOutput)); err != nil {
logger.Errorf("%s: failed to save error message to cache: %s", pipelineId, err.Error())
diff --git a/playground/backend/internal/errors/lifecycle_error.go b/playground/backend/internal/errors/lifecycle_error.go
new file mode 100644
index 00000000000..a467bfd1a00
--- /dev/null
+++ b/playground/backend/internal/errors/lifecycle_error.go
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package errors
+
+type PipelineCanceledError struct {
+ Reason string
+}
+
+func (e PipelineCanceledError) Error() string {
+ return e.Reason
+}
+
+type CompilationError struct {
+ Reason string
+}
+
+func (e CompilationError) Error() string {
+ return e.Reason
+}