You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/10/31 15:29:34 UTC

[plc4x] branch develop updated: feat(plc4xanalyzer): added option to abort a long running job

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new f3fd13bde feat(plc4xanalyzer): added option to abort a long running job
f3fd13bde is described below

commit f3fd13bdef2d19ec3acbcfbb7fa10827368f3f95
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Oct 31 16:29:21 2022 +0100

    feat(plc4xanalyzer): added option to abort a long running job
---
 .../internal/analyzer/analyzer.go                  |  9 ++-
 .../internal/extractor/extractor.go                |  9 ++-
 plc4go/tools/plc4xpcapanalyzer/ui/commands.go      | 83 ++++++++++++----------
 plc4go/tools/plc4xpcapanalyzer/ui/common.go        |  4 ++
 plc4go/tools/plc4xpcapanalyzer/ui/ui.go            |  9 ++-
 5 files changed, 73 insertions(+), 41 deletions(-)

diff --git a/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go b/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go
index 016e6f6d0..d931ee5cb 100644
--- a/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go
+++ b/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go
@@ -21,6 +21,7 @@ package analyzer
 
 import (
 	"bytes"
+	"context"
 	"encoding/hex"
 	"fmt"
 	"github.com/apache/plc4x/plc4go/spi"
@@ -46,10 +47,10 @@ func Analyze(pcapFile, protocolType string) error {
 }
 
 func AnalyzeWithOutput(pcapFile, protocolType string, stdout, stderr io.Writer) error {
-	return AnalyzeWithOutputAndCallback(pcapFile, protocolType, stdout, stderr, nil)
+	return AnalyzeWithOutputAndCallback(context.TODO(), pcapFile, protocolType, stdout, stderr, nil)
 }
 
-func AnalyzeWithOutputAndCallback(pcapFile, protocolType string, stdout, stderr io.Writer, messageCallback func(parsed spi.Message)) error {
+func AnalyzeWithOutputAndCallback(ctx context.Context, pcapFile, protocolType string, stdout, stderr io.Writer, messageCallback func(parsed spi.Message)) error {
 	log.Info().Msgf("Analyzing pcap file '%s' with protocolType '%s' and filter '%s' now", pcapFile, protocolType, config.AnalyzeConfigInstance.Filter)
 
 	handle, numberOfPackage, timestampToIndexMap, err := pcaphandler.GetIndexedPcapHandle(pcapFile, config.AnalyzeConfigInstance.Filter)
@@ -107,6 +108,10 @@ func AnalyzeWithOutputAndCallback(pcapFile, protocolType string, stdout, stderr
 	for packet := range mapPackets(source.Packets(), func(packet gopacket.Packet) common.PacketInformation {
 		return createPacketInformation(pcapFile, packet, timestampToIndexMap)
 	}) {
+		if ctx.Err() == context.Canceled {
+			log.Info().Msgf("Aborted after %d packages", currentPackageNum)
+			break
+		}
 		currentPackageNum++
 		if currentPackageNum < config.AnalyzeConfigInstance.StartPackageNumber {
 			log.Debug().Msgf("Skipping package number %d (till no. %d)", currentPackageNum, config.AnalyzeConfigInstance.StartPackageNumber)
diff --git a/plc4go/tools/plc4xpcapanalyzer/internal/extractor/extractor.go b/plc4go/tools/plc4xpcapanalyzer/internal/extractor/extractor.go
index bfd201f95..f18719fde 100644
--- a/plc4go/tools/plc4xpcapanalyzer/internal/extractor/extractor.go
+++ b/plc4go/tools/plc4xpcapanalyzer/internal/extractor/extractor.go
@@ -20,6 +20,7 @@
 package extractor
 
 import (
+	"context"
 	"fmt"
 	"github.com/apache/plc4x/plc4go/tools/plc4xpcapanalyzer/config"
 	"github.com/apache/plc4x/plc4go/tools/plc4xpcapanalyzer/internal/common"
@@ -35,10 +36,10 @@ import (
 )
 
 func Extract(pcapFile, protocolType string) error {
-	return ExtractWithOutput(pcapFile, protocolType, ansi.NewAnsiStdout(), ansi.NewAnsiStderr())
+	return ExtractWithOutput(context.TODO(), pcapFile, protocolType, ansi.NewAnsiStdout(), ansi.NewAnsiStderr())
 }
 
-func ExtractWithOutput(pcapFile, protocolType string, stdout, stderr io.Writer) error {
+func ExtractWithOutput(ctx context.Context, pcapFile, protocolType string, stdout, stderr io.Writer) error {
 	log.Info().Msgf("Analyzing pcap file '%s' with protocolType '%s' and filter '%s' now", pcapFile, protocolType, config.ExtractConfigInstance.Filter)
 
 	handle, numberOfPackage, timestampToIndexMap, err := pcaphandler.GetIndexedPcapHandle(pcapFile, config.ExtractConfigInstance.Filter)
@@ -108,6 +109,10 @@ func ExtractWithOutput(pcapFile, protocolType string, stdout, stderr io.Writer)
 	serializeFails := 0
 	compareFails := 0
 	for packet := range source.Packets() {
+		if ctx.Err() == context.Canceled {
+			log.Info().Msgf("Aborted after %d packages", currentPackageNum)
+			break
+		}
 		currentPackageNum++
 		if currentPackageNum < config.ExtractConfigInstance.StartPackageNumber {
 			log.Debug().Msgf("Skipping package number %d (till no. %d)", currentPackageNum, config.ExtractConfigInstance.StartPackageNumber)
diff --git a/plc4go/tools/plc4xpcapanalyzer/ui/commands.go b/plc4go/tools/plc4xpcapanalyzer/ui/commands.go
index bffd10e44..2ceb2b1da 100644
--- a/plc4go/tools/plc4xpcapanalyzer/ui/commands.go
+++ b/plc4go/tools/plc4xpcapanalyzer/ui/commands.go
@@ -20,6 +20,7 @@
 package ui
 
 import (
+	"context"
 	"fmt"
 	plc4x_config "github.com/apache/plc4x/plc4go/pkg/api/config"
 	"github.com/apache/plc4x/plc4go/spi"
@@ -44,7 +45,7 @@ var rootCommand = Command{
 		{
 			Name:        "ls",
 			Description: "list directories",
-			action: func(_ Command, dir string) error {
+			action: func(_ context.Context, _ Command, dir string) error {
 				if dir == "" {
 					dir = currentDir
 				}
@@ -71,7 +72,7 @@ var rootCommand = Command{
 		{
 			Name:        "cd",
 			Description: "changes directory",
-			action: func(_ Command, newDir string) error {
+			action: func(_ context.Context, _ Command, newDir string) error {
 				var proposedCurrentDir string
 				if newDir == "" {
 					var err error
@@ -125,7 +126,7 @@ var rootCommand = Command{
 		{
 			Name:        "pwd",
 			Description: "shows current directory",
-			action: func(_ Command, _ string) error {
+			action: func(_ context.Context, _ Command, _ string) error {
 				_, _ = fmt.Fprintf(commandOutput, "current directory: %s\n", currentDir)
 				return nil
 			},
@@ -133,7 +134,7 @@ var rootCommand = Command{
 		{
 			Name:        "open",
 			Description: "open file",
-			action: func(_ Command, pcapFile string) error {
+			action: func(_ context.Context, _ Command, pcapFile string) error {
 				return OpenFile(pcapFile)
 			},
 			parameterSuggestions: func(currentText string) (entries []string) {
@@ -155,7 +156,7 @@ var rootCommand = Command{
 		{
 			Name:        "analyze",
 			Description: "Analyzes a pcap file using a driver",
-			action: func(_ Command, protocolTypeAndPcapFile string) error {
+			action: func(ctx context.Context, _ Command, protocolTypeAndPcapFile string) error {
 				split := strings.Split(protocolTypeAndPcapFile, " ")
 				if len(split) != 2 {
 					return errors.Errorf("expect protocol and pcapfile")
@@ -166,7 +167,7 @@ var rootCommand = Command{
 				cliConfig.RootConfigInstance.HideProgressBar = true
 				// disabled as we get this output anyway with the message call back
 				//cliConfig.RootConfigInstance.Verbosity = 4
-				return analyzer.AnalyzeWithOutputAndCallback(pcapFile, protocolType, tview.ANSIWriter(messageOutput), tview.ANSIWriter(messageOutput), func(parsed spi.Message) {
+				return analyzer.AnalyzeWithOutputAndCallback(ctx, pcapFile, protocolType, tview.ANSIWriter(messageOutput), tview.ANSIWriter(messageOutput), func(parsed spi.Message) {
 					spiNumberOfMessagesReceived++
 					spiMessageReceived(spiNumberOfMessagesReceived, time.Now(), parsed)
 				})
@@ -183,7 +184,7 @@ var rootCommand = Command{
 		{
 			Name:        "extract",
 			Description: "Extract a pcap file using a driver",
-			action: func(_ Command, protocolTypeAndPcapFile string) error {
+			action: func(ctx context.Context, _ Command, protocolTypeAndPcapFile string) error {
 				split := strings.Split(protocolTypeAndPcapFile, " ")
 				if len(split) != 2 {
 					return errors.Errorf("expect protocol and pcapfile")
@@ -193,7 +194,7 @@ var rootCommand = Command{
 				cliConfig.PcapConfigInstance.Client = config.HostIp
 				cliConfig.RootConfigInstance.HideProgressBar = true
 				cliConfig.RootConfigInstance.Verbosity = 4
-				return extractor.ExtractWithOutput(pcapFile, protocolType, tview.ANSIWriter(messageOutput), tview.ANSIWriter(messageOutput))
+				return extractor.ExtractWithOutput(ctx, pcapFile, protocolType, tview.ANSIWriter(messageOutput), tview.ANSIWriter(messageOutput))
 			},
 			parameterSuggestions: func(currentText string) (entries []string) {
 				for _, file := range loadedPcapFiles {
@@ -210,14 +211,14 @@ var rootCommand = Command{
 			subCommands: []Command{
 				{
 					Name: "set",
-					action: func(_ Command, host string) error {
+					action: func(_ context.Context, _ Command, host string) error {
 						config.HostIp = host
 						return nil
 					},
 				},
 				{
 					Name: "get",
-					action: func(_ Command, host string) error {
+					action: func(_ context.Context, _ Command, host string) error {
 						_, _ = fmt.Fprintf(commandOutput, "current set host %s", config.HostIp)
 						return nil
 					},
@@ -227,7 +228,7 @@ var rootCommand = Command{
 		{
 			Name:        "register",
 			Description: "register a driver in the subsystem",
-			action: func(_ Command, driver string) error {
+			action: func(_ context.Context, _ Command, driver string) error {
 				return registerDriver(driver)
 			},
 			parameterSuggestions: func(currentText string) (entries []string) {
@@ -250,7 +251,7 @@ var rootCommand = Command{
 				{
 					Name:        "get",
 					Description: "Get a log level",
-					action: func(_ Command, _ string) error {
+					action: func(_ context.Context, _ Command, _ string) error {
 						_, _ = fmt.Fprintf(commandOutput, "Current log level %s", log.Logger.GetLevel())
 						return nil
 					},
@@ -258,7 +259,7 @@ var rootCommand = Command{
 				{
 					Name:        "set",
 					Description: "Sets a log level",
-					action: func(_ Command, level string) error {
+					action: func(_ context.Context, _ Command, level string) error {
 						parseLevel, err := zerolog.ParseLevel(level)
 						if err != nil {
 							return errors.Wrapf(err, "Error setting log level")
@@ -296,7 +297,7 @@ var rootCommand = Command{
 						{
 							Name:        "on",
 							Description: "trace on",
-							action: func(_ Command, _ string) error {
+							action: func(_ context.Context, _ Command, _ string) error {
 								plc4x_config.TraceTransactionManagerWorkers = true
 								return nil
 							},
@@ -304,7 +305,7 @@ var rootCommand = Command{
 						{
 							Name:        "off",
 							Description: "trace off",
-							action: func(_ Command, _ string) error {
+							action: func(_ context.Context, _ Command, _ string) error {
 								plc4x_config.TraceTransactionManagerWorkers = false
 								return nil
 							},
@@ -318,7 +319,7 @@ var rootCommand = Command{
 						{
 							Name:        "on",
 							Description: "trace on",
-							action: func(_ Command, _ string) error {
+							action: func(_ context.Context, _ Command, _ string) error {
 								plc4x_config.TraceTransactionManagerTransactions = true
 								return nil
 							},
@@ -326,7 +327,7 @@ var rootCommand = Command{
 						{
 							Name:        "off",
 							Description: "trace off",
-							action: func(_ Command, _ string) error {
+							action: func(_ context.Context, _ Command, _ string) error {
 								plc4x_config.TraceTransactionManagerTransactions = false
 								return nil
 							},
@@ -340,7 +341,7 @@ var rootCommand = Command{
 						{
 							Name:        "on",
 							Description: "trace on",
-							action: func(_ Command, _ string) error {
+							action: func(_ context.Context, _ Command, _ string) error {
 								plc4x_config.TraceDefaultMessageCodecWorker = true
 								return nil
 							},
@@ -348,7 +349,7 @@ var rootCommand = Command{
 						{
 							Name:        "off",
 							Description: "trace off",
-							action: func(_ Command, _ string) error {
+							action: func(_ context.Context, _ Command, _ string) error {
 								plc4x_config.TraceDefaultMessageCodecWorker = false
 								return nil
 							},
@@ -362,7 +363,7 @@ var rootCommand = Command{
 						{
 							Name:        "on",
 							Description: "debug on",
-							action: func(_ Command, _ string) error {
+							action: func(_ context.Context, _ Command, _ string) error {
 								plc4xpcapanalyzerLog = zerolog.New(zerolog.ConsoleWriter{Out: tview.ANSIWriter(consoleOutput)})
 								return nil
 							},
@@ -370,7 +371,7 @@ var rootCommand = Command{
 						{
 							Name:        "off",
 							Description: "debug off",
-							action: func(_ Command, _ string) error {
+							action: func(_ context.Context, _ Command, _ string) error {
 								plc4xpcapanalyzerLog = zerolog.Nop()
 								return nil
 							},
@@ -383,14 +384,14 @@ var rootCommand = Command{
 					subCommands: []Command{
 						{
 							Name: "list",
-							action: func(currentCommand Command, argument string) error {
+							action: func(_ context.Context, currentCommand Command, argument string) error {
 								_, _ = fmt.Fprintf(commandOutput, "Auto-register enabled drivers:\n  %s\n", strings.Join(config.AutoRegisterDrivers, "\n  "))
 								return nil
 							},
 						},
 						{
 							Name: "enable",
-							action: func(_ Command, argument string) error {
+							action: func(_ context.Context, _ Command, argument string) error {
 								return enableAutoRegister(argument)
 							},
 							parameterSuggestions: func(currentText string) (entries []string) {
@@ -404,7 +405,7 @@ var rootCommand = Command{
 						},
 						{
 							Name: "disable",
-							action: func(_ Command, argument string) error {
+							action: func(_ context.Context, _ Command, argument string) error {
 								return disableAutoRegister(argument)
 							},
 							parameterSuggestions: func(currentText string) (entries []string) {
@@ -423,7 +424,7 @@ var rootCommand = Command{
 		{
 			Name:        "history",
 			Description: "outputs the last commands",
-			action: func(_ Command, _ string) error {
+			action: func(_ context.Context, _ Command, _ string) error {
 				outputCommandHistory()
 				return nil
 			},
@@ -431,7 +432,7 @@ var rootCommand = Command{
 		{
 			Name:        "clear",
 			Description: "clear all outputs",
-			action: func(_ Command, _ string) error {
+			action: func(_ context.Context, _ Command, _ string) error {
 				messageOutputClear()
 				consoleOutputClear()
 				commandOutputClear()
@@ -441,7 +442,7 @@ var rootCommand = Command{
 				{
 					Name:        "message",
 					Description: "clears message output",
-					action: func(_ Command, _ string) error {
+					action: func(_ context.Context, _ Command, _ string) error {
 						messageOutputClear()
 						return nil
 					},
@@ -449,7 +450,7 @@ var rootCommand = Command{
 				{
 					Name:        "console",
 					Description: "clears console output",
-					action: func(_ Command, _ string) error {
+					action: func(_ context.Context, _ Command, _ string) error {
 						consoleOutputClear()
 						return nil
 					},
@@ -457,13 +458,23 @@ var rootCommand = Command{
 				{
 					Name:        "command",
 					Description: "clears command output",
-					action: func(_ Command, _ string) error {
+					action: func(_ context.Context, _ Command, _ string) error {
 						commandOutputClear()
 						return nil
 					},
 				},
 			},
 		},
+		{
+			Name:        "abort",
+			Description: "abort currently running jobs",
+			action: func(_ context.Context, _ Command, _ string) error {
+				for _, cancelFunc := range cancelFunctions {
+					cancelFunc()
+				}
+				return nil
+			},
+		},
 	},
 }
 
@@ -472,7 +483,7 @@ func init() {
 	rootCommand.subCommands = append(rootCommand.subCommands, Command{
 		Name:        "help",
 		Description: "prints out this help",
-		action: func(_ Command, _ string) error {
+		action: func(_ context.Context, _ Command, _ string) error {
 			_, _ = fmt.Fprintf(commandOutput, "[#0000ff]Available commands[white]\n")
 			rootCommand.visit(0, func(currentIndent int, command Command) {
 				indentString := strings.Repeat("  ", currentIndent)
@@ -492,7 +503,7 @@ var NotDirectlyExecutable = errors.New("Not directly executable")
 type Command struct {
 	Name                 string
 	Description          string
-	action               func(currentCommand Command, argument string) error
+	action               func(ctx context.Context, currentCommand Command, argument string) error
 	subCommands          []Command
 	parameterSuggestions func(currentText string) (entries []string)
 }
@@ -572,15 +583,15 @@ func (c Command) hasDirectExecution() bool {
 	return c.action != nil
 }
 
-func Execute(commandText string) error {
-	err := rootCommand.Execute(commandText)
+func Execute(ctx context.Context, commandText string) error {
+	err := rootCommand.Execute(ctx, commandText)
 	if err == nil {
 		addCommandHistoryEntry(commandText)
 	}
 	return err
 }
 
-func (c Command) Execute(commandText string) error {
+func (c Command) Execute(ctx context.Context, commandText string) error {
 	plc4xpcapanalyzerLog.Debug().Msgf("%s executes %s", c, commandText)
 	if !c.acceptsCurrentText(commandText) {
 		return errors.Errorf("%s doesn't understand %s", c.Name, commandText)
@@ -590,7 +601,7 @@ func (c Command) Execute(commandText string) error {
 		for _, command := range c.subCommands {
 			if command.acceptsCurrentText(prepareForSubCommandForSubCommand) {
 				plc4xpcapanalyzerLog.Debug().Msgf("%s delegates to sub %s", c, command)
-				return command.Execute(prepareForSubCommandForSubCommand)
+				return command.Execute(ctx, prepareForSubCommandForSubCommand)
 			}
 		}
 		return errors.Errorf("%s not accepted by any subcommands of %s", commandText, c.Name)
@@ -600,7 +611,7 @@ func (c Command) Execute(commandText string) error {
 		}
 		plc4xpcapanalyzerLog.Debug().Msgf("%s executes %s directly", c, commandText)
 		preparedForParameters := c.prepareForParameters(commandText)
-		return c.action(c, preparedForParameters)
+		return c.action(ctx, c, preparedForParameters)
 	}
 }
 
diff --git a/plc4go/tools/plc4xpcapanalyzer/ui/common.go b/plc4go/tools/plc4xpcapanalyzer/ui/common.go
index 6759d5a3d..09bf5bfbd 100644
--- a/plc4go/tools/plc4xpcapanalyzer/ui/common.go
+++ b/plc4go/tools/plc4xpcapanalyzer/ui/common.go
@@ -20,6 +20,7 @@
 package ui
 
 import (
+	"context"
 	plc4go "github.com/apache/plc4x/plc4go/pkg/api"
 	plc4goModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	"github.com/apache/plc4x/plc4go/spi"
@@ -68,6 +69,9 @@ var currentDir = func() string {
 	return dir
 }()
 
+var rootContext = context.Background()
+var cancelFunctions = make(map[uint32]context.CancelFunc)
+
 var shutdownMutex sync.Mutex
 var hasShutdown bool
 
diff --git a/plc4go/tools/plc4xpcapanalyzer/ui/ui.go b/plc4go/tools/plc4xpcapanalyzer/ui/ui.go
index 9dc60b659..ad3bbfe43 100644
--- a/plc4go/tools/plc4xpcapanalyzer/ui/ui.go
+++ b/plc4go/tools/plc4xpcapanalyzer/ui/ui.go
@@ -20,12 +20,14 @@
 package ui
 
 import (
+	"context"
 	"fmt"
 	"github.com/apache/plc4x/plc4go/pkg/api/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/gdamore/tcell/v2"
 	"github.com/pkg/errors"
 	"github.com/rivo/tview"
+	"math/rand"
 	"regexp"
 	"strconv"
 	"time"
@@ -167,7 +169,12 @@ func buildCommandArea(newPrimitive func(text string) tview.Primitive, applicatio
 						}
 					}
 					_, _ = fmt.Fprintf(enteredCommandsView, "%s [\"%d\"]%s[\"\"]\n", time.Now().Format("04:05"), commandsExecuted, commandText)
-					if err := Execute(commandText); err != nil {
+					ctx, cancelFunc := context.WithCancel(rootContext)
+					randomId := rand.Uint32()
+					cancelFunctions[randomId] = cancelFunc
+					defer delete(cancelFunctions, randomId)
+
+					if err := Execute(ctx, commandText); err != nil {
 						_, _ = fmt.Fprintf(enteredCommandsView, "[#ff0000]%s %s[white]\n", time.Now().Format("04:05"), err)
 						return
 					}