You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by GitBox <gi...@apache.org> on 2022/09/06 06:52:01 UTC

[GitHub] [incubator-devlake] warren830 commented on a diff in pull request #2943: fix:Encrypting blueprint.settings blueprint.plan and pipeline.plan in the database #2868

warren830 commented on code in PR #2943:
URL: https://github.com/apache/incubator-devlake/pull/2943#discussion_r963304048


##########
services/blueprint_helper.go:
##########
@@ -0,0 +1,148 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+	"github.com/apache/incubator-devlake/config"
+	"github.com/apache/incubator-devlake/models"
+	"github.com/apache/incubator-devlake/models/common"
+	"github.com/apache/incubator-devlake/plugins/core"
+	"gorm.io/gorm"
+)
+
+// CreateBlueprint accepts a Blueprint instance and insert it to database
+func CreateDbBlueprint(dbBlueprint *models.DbBlueprint) error {
+	err := db.Create(&dbBlueprint).Error
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// GetBlueprints returns a paginated list of Blueprints based on `query`
+func GetDbBlueprints(query *BlueprintQuery) ([]*models.DbBlueprint, int64, error) {
+	dbBlueprints := make([]*models.DbBlueprint, 0)
+	db := db.Model(dbBlueprints).Order("id DESC")
+	if query.Enable != nil {
+		db = db.Where("enable = ?", *query.Enable)
+	}
+
+	var count int64
+	err := db.Count(&count).Error
+	if err != nil {
+		return nil, 0, err
+	}
+	if query.Page > 0 && query.PageSize > 0 {
+		offset := query.PageSize * (query.Page - 1)
+		db = db.Limit(query.PageSize).Offset(offset)
+	}
+	err = db.Find(&dbBlueprints).Error
+	if err != nil {
+		return nil, 0, err
+	}
+
+	return dbBlueprints, count, nil
+}
+
+// GetBlueprint returns the detail of a given Blueprint ID
+func GetDbBlueprint(dbBlueprintId uint64) (*models.DbBlueprint, error) {
+	dbBlueprint := &models.DbBlueprint{}
+	err := db.First(dbBlueprint, dbBlueprintId).Error
+	if err != nil {
+		if err == gorm.ErrRecordNotFound {
+			return nil, err
+		}
+		return nil, err
+	}
+	return dbBlueprint, nil
+}
+
+// DeleteBlueprint FIXME ...
+func DeleteDbBlueprint(id uint64) error {
+	err := db.Delete(&models.DbBlueprint{}, "id = ?", id).Error
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// parseBlueprint
+func parseBlueprint(DbBlueprint *models.DbBlueprint) (*models.Blueprint, error) {
+	blueprint := models.Blueprint{
+		Name:       DbBlueprint.Name,
+		Mode:       DbBlueprint.Mode,
+		Plan:       []byte(DbBlueprint.Plan),
+		Enable:     DbBlueprint.Enable,
+		CronConfig: DbBlueprint.CronConfig,
+		IsManual:   DbBlueprint.IsManual,
+		Settings:   []byte(DbBlueprint.Settings),
+		Model: common.Model{
+			ID: DbBlueprint.ID,

Review Comment:
   maybe we can directly use `Model: pipeline.Model,`



##########
services/pipeline_helper.go:
##########
@@ -0,0 +1,208 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/apache/incubator-devlake/config"
+	"github.com/apache/incubator-devlake/errors"
+	"github.com/apache/incubator-devlake/models"
+	"github.com/apache/incubator-devlake/models/common"
+	"github.com/apache/incubator-devlake/plugins/core"
+	"gorm.io/gorm"
+)
+
+// CreatePipeline and return the model
+func CreateDbPipeline(newPipeline *models.NewPipeline) (*models.DbPipeline, error) {
+	// create pipeline object from posted data
+	dbPipeline := &models.DbPipeline{
+		Name:          newPipeline.Name,
+		FinishedTasks: 0,
+		Status:        models.TASK_CREATED,
+		Message:       "",
+		SpentSeconds:  0,
+	}
+	if newPipeline.BlueprintId != 0 {
+		dbPipeline.BlueprintId = newPipeline.BlueprintId
+	}
+	dbPipeline, err := encryptDbPipeline(dbPipeline)
+	if err != nil {
+		return nil, err
+	}
+	// save pipeline to database
+	err = db.Create(&dbPipeline).Error
+	if err != nil {
+		globalPipelineLog.Error(err, "create pipline failed: %w", err)
+		return nil, errors.Internal.Wrap(err, "create pipline failed")
+	}
+
+	// create tasks accordingly
+	for i := range newPipeline.Plan {
+		for j := range newPipeline.Plan[i] {
+			pipelineTask := newPipeline.Plan[i][j]
+			newTask := &models.NewTask{
+				PipelineTask: pipelineTask,
+				PipelineId:   dbPipeline.ID,
+				PipelineRow:  i + 1,
+				PipelineCol:  j + 1,
+			}
+			_, err := CreateTask(newTask)
+			if err != nil {
+				globalPipelineLog.Error(err, "create task for pipeline failed: %w", err)
+				return nil, err
+			}
+			// sync task state back to pipeline
+			dbPipeline.TotalTasks += 1
+		}
+	}
+	if err != nil {
+		globalPipelineLog.Error(err, "save tasks for pipeline failed: %w", err)
+		return nil, errors.Internal.Wrap(err, "save tasks for pipeline failed")
+	}
+	if dbPipeline.TotalTasks == 0 {
+		return nil, fmt.Errorf("no task to run")
+	}
+
+	// update tasks state
+	planByte, err := json.Marshal(newPipeline.Plan)
+	if err != nil {
+		return nil, err
+	}
+	dbPipeline.Plan = string(planByte)
+	dbPipeline, err = encryptDbPipeline(dbPipeline)
+	if err != nil {
+		return nil, err
+	}
+	err = db.Model(dbPipeline).Updates(map[string]interface{}{
+		"total_tasks": dbPipeline.TotalTasks,
+		"plan":        dbPipeline.Plan,
+	}).Error
+	if err != nil {
+		globalPipelineLog.Error(err, "update pipline state failed: %w", err)
+		return nil, errors.Internal.Wrap(err, "update pipline state failed")
+	}
+
+	return dbPipeline, nil
+}
+
+// GetPipelines by query
+func GetDbPipelines(query *PipelineQuery) ([]*models.DbPipeline, int64, error) {
+	dbPipelines := make([]*models.DbPipeline, 0)
+	db := db.Model(dbPipelines).Order("id DESC")
+	if query.BlueprintId != 0 {
+		db = db.Where("blueprint_id = ?", query.BlueprintId)
+	}
+	if query.Status != "" {
+		db = db.Where("status = ?", query.Status)
+	}
+	if query.Pending > 0 {
+		db = db.Where("finished_at is null")
+	}
+	var count int64
+	err := db.Count(&count).Error
+	if err != nil {
+		return nil, 0, err
+	}
+	if query.Page > 0 && query.PageSize > 0 {
+		offset := query.PageSize * (query.Page - 1)
+		db = db.Limit(query.PageSize).Offset(offset)
+	}
+	err = db.Find(&dbPipelines).Error
+	if err != nil {
+		return nil, count, err
+	}
+	return dbPipelines, count, nil
+}
+
+// GetPipeline by id
+func GetDbPipeline(pipelineId uint64) (*models.DbPipeline, error) {
+	dbPipeline := &models.DbPipeline{}
+	err := db.First(dbPipeline, pipelineId).Error
+	if err != nil {
+		if err == gorm.ErrRecordNotFound {
+			return nil, errors.NotFound.New("pipeline not found", errors.AsUserMessage())
+		}
+		return nil, errors.Internal.Wrap(err, "error getting the pipeline from database", errors.AsUserMessage())
+	}
+	return dbPipeline, nil
+}
+
+// parsePipeline
+func parsePipeline(dbPipeline *models.DbPipeline) (*models.Pipeline, error) {
+	pipeline := models.Pipeline{
+		Model: common.Model{
+			ID: dbPipeline.ID,
+		},
+		Name:          dbPipeline.Name,
+		BlueprintId:   dbPipeline.BlueprintId,
+		Plan:          []byte(dbPipeline.Plan),
+		TotalTasks:    dbPipeline.TotalTasks,
+		FinishedTasks: dbPipeline.FinishedTasks,
+		BeganAt:       dbPipeline.BeganAt,
+		FinishedAt:    dbPipeline.FinishedAt,
+		Status:        dbPipeline.Status,
+		Message:       dbPipeline.Message,
+		SpentSeconds:  dbPipeline.SpentSeconds,
+		Stage:         dbPipeline.Stage,
+	}
+	return &pipeline, nil
+}
+
+// parseDbPipeline
+func parseDbPipeline(pipeline *models.Pipeline) (*models.DbPipeline, error) {
+	dbPipeline := models.DbPipeline{
+		Model: common.Model{
+			ID: pipeline.ID,

Review Comment:
   maybe we can directly use `Model: pipeline.Model,`



##########
services/pipeline_helper.go:
##########
@@ -0,0 +1,208 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/apache/incubator-devlake/config"
+	"github.com/apache/incubator-devlake/errors"
+	"github.com/apache/incubator-devlake/models"
+	"github.com/apache/incubator-devlake/models/common"
+	"github.com/apache/incubator-devlake/plugins/core"
+	"gorm.io/gorm"
+)
+
+// CreatePipeline and return the model
+func CreateDbPipeline(newPipeline *models.NewPipeline) (*models.DbPipeline, error) {
+	// create pipeline object from posted data
+	dbPipeline := &models.DbPipeline{
+		Name:          newPipeline.Name,
+		FinishedTasks: 0,
+		Status:        models.TASK_CREATED,
+		Message:       "",
+		SpentSeconds:  0,
+	}
+	if newPipeline.BlueprintId != 0 {
+		dbPipeline.BlueprintId = newPipeline.BlueprintId
+	}
+	dbPipeline, err := encryptDbPipeline(dbPipeline)
+	if err != nil {
+		return nil, err
+	}
+	// save pipeline to database
+	err = db.Create(&dbPipeline).Error
+	if err != nil {
+		globalPipelineLog.Error(err, "create pipline failed: %w", err)
+		return nil, errors.Internal.Wrap(err, "create pipline failed")
+	}
+
+	// create tasks accordingly
+	for i := range newPipeline.Plan {
+		for j := range newPipeline.Plan[i] {
+			pipelineTask := newPipeline.Plan[i][j]
+			newTask := &models.NewTask{
+				PipelineTask: pipelineTask,
+				PipelineId:   dbPipeline.ID,
+				PipelineRow:  i + 1,
+				PipelineCol:  j + 1,
+			}
+			_, err := CreateTask(newTask)
+			if err != nil {
+				globalPipelineLog.Error(err, "create task for pipeline failed: %w", err)
+				return nil, err
+			}
+			// sync task state back to pipeline
+			dbPipeline.TotalTasks += 1
+		}
+	}
+	if err != nil {
+		globalPipelineLog.Error(err, "save tasks for pipeline failed: %w", err)
+		return nil, errors.Internal.Wrap(err, "save tasks for pipeline failed")
+	}
+	if dbPipeline.TotalTasks == 0 {
+		return nil, fmt.Errorf("no task to run")
+	}
+
+	// update tasks state
+	planByte, err := json.Marshal(newPipeline.Plan)
+	if err != nil {
+		return nil, err
+	}
+	dbPipeline.Plan = string(planByte)
+	dbPipeline, err = encryptDbPipeline(dbPipeline)
+	if err != nil {
+		return nil, err
+	}
+	err = db.Model(dbPipeline).Updates(map[string]interface{}{
+		"total_tasks": dbPipeline.TotalTasks,
+		"plan":        dbPipeline.Plan,
+	}).Error
+	if err != nil {
+		globalPipelineLog.Error(err, "update pipline state failed: %w", err)
+		return nil, errors.Internal.Wrap(err, "update pipline state failed")
+	}
+
+	return dbPipeline, nil
+}
+
+// GetPipelines by query
+func GetDbPipelines(query *PipelineQuery) ([]*models.DbPipeline, int64, error) {
+	dbPipelines := make([]*models.DbPipeline, 0)
+	db := db.Model(dbPipelines).Order("id DESC")
+	if query.BlueprintId != 0 {
+		db = db.Where("blueprint_id = ?", query.BlueprintId)
+	}
+	if query.Status != "" {
+		db = db.Where("status = ?", query.Status)
+	}
+	if query.Pending > 0 {
+		db = db.Where("finished_at is null")
+	}
+	var count int64
+	err := db.Count(&count).Error
+	if err != nil {
+		return nil, 0, err
+	}
+	if query.Page > 0 && query.PageSize > 0 {
+		offset := query.PageSize * (query.Page - 1)
+		db = db.Limit(query.PageSize).Offset(offset)
+	}
+	err = db.Find(&dbPipelines).Error
+	if err != nil {
+		return nil, count, err
+	}
+	return dbPipelines, count, nil
+}
+
+// GetPipeline by id
+func GetDbPipeline(pipelineId uint64) (*models.DbPipeline, error) {
+	dbPipeline := &models.DbPipeline{}
+	err := db.First(dbPipeline, pipelineId).Error
+	if err != nil {
+		if err == gorm.ErrRecordNotFound {
+			return nil, errors.NotFound.New("pipeline not found", errors.AsUserMessage())
+		}
+		return nil, errors.Internal.Wrap(err, "error getting the pipeline from database", errors.AsUserMessage())
+	}
+	return dbPipeline, nil
+}
+
+// parsePipeline
+func parsePipeline(dbPipeline *models.DbPipeline) (*models.Pipeline, error) {
+	pipeline := models.Pipeline{
+		Model: common.Model{
+			ID: dbPipeline.ID,

Review Comment:
   should also add CreatedAt and UpdatedAt when you are getting it



##########
services/blueprint_helper.go:
##########
@@ -0,0 +1,148 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+	"github.com/apache/incubator-devlake/config"
+	"github.com/apache/incubator-devlake/models"
+	"github.com/apache/incubator-devlake/models/common"
+	"github.com/apache/incubator-devlake/plugins/core"
+	"gorm.io/gorm"
+)
+
+// CreateBlueprint accepts a Blueprint instance and insert it to database
+func CreateDbBlueprint(dbBlueprint *models.DbBlueprint) error {
+	err := db.Create(&dbBlueprint).Error
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// GetBlueprints returns a paginated list of Blueprints based on `query`
+func GetDbBlueprints(query *BlueprintQuery) ([]*models.DbBlueprint, int64, error) {
+	dbBlueprints := make([]*models.DbBlueprint, 0)
+	db := db.Model(dbBlueprints).Order("id DESC")
+	if query.Enable != nil {
+		db = db.Where("enable = ?", *query.Enable)
+	}
+
+	var count int64
+	err := db.Count(&count).Error
+	if err != nil {
+		return nil, 0, err
+	}
+	if query.Page > 0 && query.PageSize > 0 {
+		offset := query.PageSize * (query.Page - 1)
+		db = db.Limit(query.PageSize).Offset(offset)
+	}
+	err = db.Find(&dbBlueprints).Error
+	if err != nil {
+		return nil, 0, err
+	}
+
+	return dbBlueprints, count, nil
+}
+
+// GetBlueprint returns the detail of a given Blueprint ID
+func GetDbBlueprint(dbBlueprintId uint64) (*models.DbBlueprint, error) {
+	dbBlueprint := &models.DbBlueprint{}
+	err := db.First(dbBlueprint, dbBlueprintId).Error
+	if err != nil {
+		if err == gorm.ErrRecordNotFound {
+			return nil, err
+		}
+		return nil, err
+	}
+	return dbBlueprint, nil
+}
+
+// DeleteBlueprint FIXME ...
+func DeleteDbBlueprint(id uint64) error {
+	err := db.Delete(&models.DbBlueprint{}, "id = ?", id).Error
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// parseBlueprint
+func parseBlueprint(DbBlueprint *models.DbBlueprint) (*models.Blueprint, error) {
+	blueprint := models.Blueprint{
+		Name:       DbBlueprint.Name,
+		Mode:       DbBlueprint.Mode,
+		Plan:       []byte(DbBlueprint.Plan),
+		Enable:     DbBlueprint.Enable,
+		CronConfig: DbBlueprint.CronConfig,
+		IsManual:   DbBlueprint.IsManual,
+		Settings:   []byte(DbBlueprint.Settings),
+		Model: common.Model{
+			ID: DbBlueprint.ID,
+		},
+	}
+	return &blueprint, nil
+}
+
+// parseDbBlueprint
+func parseDbBlueprint(blueprint *models.Blueprint) (*models.DbBlueprint, error) {

Review Comment:
   we don't need a return err



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@devlake.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org