You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/10/31 15:29:34 UTC
[plc4x] branch develop updated: feat(plc4xanalyzer): added option to abort a long running job
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new f3fd13bde feat(plc4xanalyzer): added option to abort a long running job
f3fd13bde is described below
commit f3fd13bdef2d19ec3acbcfbb7fa10827368f3f95
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Oct 31 16:29:21 2022 +0100
feat(plc4xanalyzer): added option to abort a long running job
---
.../internal/analyzer/analyzer.go | 9 ++-
.../internal/extractor/extractor.go | 9 ++-
plc4go/tools/plc4xpcapanalyzer/ui/commands.go | 83 ++++++++++++----------
plc4go/tools/plc4xpcapanalyzer/ui/common.go | 4 ++
plc4go/tools/plc4xpcapanalyzer/ui/ui.go | 9 ++-
5 files changed, 73 insertions(+), 41 deletions(-)
diff --git a/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go b/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go
index 016e6f6d0..d931ee5cb 100644
--- a/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go
+++ b/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go
@@ -21,6 +21,7 @@ package analyzer
import (
"bytes"
+ "context"
"encoding/hex"
"fmt"
"github.com/apache/plc4x/plc4go/spi"
@@ -46,10 +47,10 @@ func Analyze(pcapFile, protocolType string) error {
}
func AnalyzeWithOutput(pcapFile, protocolType string, stdout, stderr io.Writer) error {
- return AnalyzeWithOutputAndCallback(pcapFile, protocolType, stdout, stderr, nil)
+ return AnalyzeWithOutputAndCallback(context.TODO(), pcapFile, protocolType, stdout, stderr, nil)
}
-func AnalyzeWithOutputAndCallback(pcapFile, protocolType string, stdout, stderr io.Writer, messageCallback func(parsed spi.Message)) error {
+func AnalyzeWithOutputAndCallback(ctx context.Context, pcapFile, protocolType string, stdout, stderr io.Writer, messageCallback func(parsed spi.Message)) error {
log.Info().Msgf("Analyzing pcap file '%s' with protocolType '%s' and filter '%s' now", pcapFile, protocolType, config.AnalyzeConfigInstance.Filter)
handle, numberOfPackage, timestampToIndexMap, err := pcaphandler.GetIndexedPcapHandle(pcapFile, config.AnalyzeConfigInstance.Filter)
@@ -107,6 +108,10 @@ func AnalyzeWithOutputAndCallback(pcapFile, protocolType string, stdout, stderr
for packet := range mapPackets(source.Packets(), func(packet gopacket.Packet) common.PacketInformation {
return createPacketInformation(pcapFile, packet, timestampToIndexMap)
}) {
+ if ctx.Err() == context.Canceled {
+ log.Info().Msgf("Aborted after %d packages", currentPackageNum)
+ break
+ }
currentPackageNum++
if currentPackageNum < config.AnalyzeConfigInstance.StartPackageNumber {
log.Debug().Msgf("Skipping package number %d (till no. %d)", currentPackageNum, config.AnalyzeConfigInstance.StartPackageNumber)
diff --git a/plc4go/tools/plc4xpcapanalyzer/internal/extractor/extractor.go b/plc4go/tools/plc4xpcapanalyzer/internal/extractor/extractor.go
index bfd201f95..f18719fde 100644
--- a/plc4go/tools/plc4xpcapanalyzer/internal/extractor/extractor.go
+++ b/plc4go/tools/plc4xpcapanalyzer/internal/extractor/extractor.go
@@ -20,6 +20,7 @@
package extractor
import (
+ "context"
"fmt"
"github.com/apache/plc4x/plc4go/tools/plc4xpcapanalyzer/config"
"github.com/apache/plc4x/plc4go/tools/plc4xpcapanalyzer/internal/common"
@@ -35,10 +36,10 @@ import (
)
func Extract(pcapFile, protocolType string) error {
- return ExtractWithOutput(pcapFile, protocolType, ansi.NewAnsiStdout(), ansi.NewAnsiStderr())
+ return ExtractWithOutput(context.TODO(), pcapFile, protocolType, ansi.NewAnsiStdout(), ansi.NewAnsiStderr())
}
-func ExtractWithOutput(pcapFile, protocolType string, stdout, stderr io.Writer) error {
+func ExtractWithOutput(ctx context.Context, pcapFile, protocolType string, stdout, stderr io.Writer) error {
log.Info().Msgf("Analyzing pcap file '%s' with protocolType '%s' and filter '%s' now", pcapFile, protocolType, config.ExtractConfigInstance.Filter)
handle, numberOfPackage, timestampToIndexMap, err := pcaphandler.GetIndexedPcapHandle(pcapFile, config.ExtractConfigInstance.Filter)
@@ -108,6 +109,10 @@ func ExtractWithOutput(pcapFile, protocolType string, stdout, stderr io.Writer)
serializeFails := 0
compareFails := 0
for packet := range source.Packets() {
+ if ctx.Err() == context.Canceled {
+ log.Info().Msgf("Aborted after %d packages", currentPackageNum)
+ break
+ }
currentPackageNum++
if currentPackageNum < config.ExtractConfigInstance.StartPackageNumber {
log.Debug().Msgf("Skipping package number %d (till no. %d)", currentPackageNum, config.ExtractConfigInstance.StartPackageNumber)
diff --git a/plc4go/tools/plc4xpcapanalyzer/ui/commands.go b/plc4go/tools/plc4xpcapanalyzer/ui/commands.go
index bffd10e44..2ceb2b1da 100644
--- a/plc4go/tools/plc4xpcapanalyzer/ui/commands.go
+++ b/plc4go/tools/plc4xpcapanalyzer/ui/commands.go
@@ -20,6 +20,7 @@
package ui
import (
+ "context"
"fmt"
plc4x_config "github.com/apache/plc4x/plc4go/pkg/api/config"
"github.com/apache/plc4x/plc4go/spi"
@@ -44,7 +45,7 @@ var rootCommand = Command{
{
Name: "ls",
Description: "list directories",
- action: func(_ Command, dir string) error {
+ action: func(_ context.Context, _ Command, dir string) error {
if dir == "" {
dir = currentDir
}
@@ -71,7 +72,7 @@ var rootCommand = Command{
{
Name: "cd",
Description: "changes directory",
- action: func(_ Command, newDir string) error {
+ action: func(_ context.Context, _ Command, newDir string) error {
var proposedCurrentDir string
if newDir == "" {
var err error
@@ -125,7 +126,7 @@ var rootCommand = Command{
{
Name: "pwd",
Description: "shows current directory",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
_, _ = fmt.Fprintf(commandOutput, "current directory: %s\n", currentDir)
return nil
},
@@ -133,7 +134,7 @@ var rootCommand = Command{
{
Name: "open",
Description: "open file",
- action: func(_ Command, pcapFile string) error {
+ action: func(_ context.Context, _ Command, pcapFile string) error {
return OpenFile(pcapFile)
},
parameterSuggestions: func(currentText string) (entries []string) {
@@ -155,7 +156,7 @@ var rootCommand = Command{
{
Name: "analyze",
Description: "Analyzes a pcap file using a driver",
- action: func(_ Command, protocolTypeAndPcapFile string) error {
+ action: func(ctx context.Context, _ Command, protocolTypeAndPcapFile string) error {
split := strings.Split(protocolTypeAndPcapFile, " ")
if len(split) != 2 {
return errors.Errorf("expect protocol and pcapfile")
@@ -166,7 +167,7 @@ var rootCommand = Command{
cliConfig.RootConfigInstance.HideProgressBar = true
// disabled as we get this output anyway with the message call back
//cliConfig.RootConfigInstance.Verbosity = 4
- return analyzer.AnalyzeWithOutputAndCallback(pcapFile, protocolType, tview.ANSIWriter(messageOutput), tview.ANSIWriter(messageOutput), func(parsed spi.Message) {
+ return analyzer.AnalyzeWithOutputAndCallback(ctx, pcapFile, protocolType, tview.ANSIWriter(messageOutput), tview.ANSIWriter(messageOutput), func(parsed spi.Message) {
spiNumberOfMessagesReceived++
spiMessageReceived(spiNumberOfMessagesReceived, time.Now(), parsed)
})
@@ -183,7 +184,7 @@ var rootCommand = Command{
{
Name: "extract",
Description: "Extract a pcap file using a driver",
- action: func(_ Command, protocolTypeAndPcapFile string) error {
+ action: func(ctx context.Context, _ Command, protocolTypeAndPcapFile string) error {
split := strings.Split(protocolTypeAndPcapFile, " ")
if len(split) != 2 {
return errors.Errorf("expect protocol and pcapfile")
@@ -193,7 +194,7 @@ var rootCommand = Command{
cliConfig.PcapConfigInstance.Client = config.HostIp
cliConfig.RootConfigInstance.HideProgressBar = true
cliConfig.RootConfigInstance.Verbosity = 4
- return extractor.ExtractWithOutput(pcapFile, protocolType, tview.ANSIWriter(messageOutput), tview.ANSIWriter(messageOutput))
+ return extractor.ExtractWithOutput(ctx, pcapFile, protocolType, tview.ANSIWriter(messageOutput), tview.ANSIWriter(messageOutput))
},
parameterSuggestions: func(currentText string) (entries []string) {
for _, file := range loadedPcapFiles {
@@ -210,14 +211,14 @@ var rootCommand = Command{
subCommands: []Command{
{
Name: "set",
- action: func(_ Command, host string) error {
+ action: func(_ context.Context, _ Command, host string) error {
config.HostIp = host
return nil
},
},
{
Name: "get",
- action: func(_ Command, host string) error {
+ action: func(_ context.Context, _ Command, host string) error {
_, _ = fmt.Fprintf(commandOutput, "current set host %s", config.HostIp)
return nil
},
@@ -227,7 +228,7 @@ var rootCommand = Command{
{
Name: "register",
Description: "register a driver in the subsystem",
- action: func(_ Command, driver string) error {
+ action: func(_ context.Context, _ Command, driver string) error {
return registerDriver(driver)
},
parameterSuggestions: func(currentText string) (entries []string) {
@@ -250,7 +251,7 @@ var rootCommand = Command{
{
Name: "get",
Description: "Get a log level",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
_, _ = fmt.Fprintf(commandOutput, "Current log level %s", log.Logger.GetLevel())
return nil
},
@@ -258,7 +259,7 @@ var rootCommand = Command{
{
Name: "set",
Description: "Sets a log level",
- action: func(_ Command, level string) error {
+ action: func(_ context.Context, _ Command, level string) error {
parseLevel, err := zerolog.ParseLevel(level)
if err != nil {
return errors.Wrapf(err, "Error setting log level")
@@ -296,7 +297,7 @@ var rootCommand = Command{
{
Name: "on",
Description: "trace on",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
plc4x_config.TraceTransactionManagerWorkers = true
return nil
},
@@ -304,7 +305,7 @@ var rootCommand = Command{
{
Name: "off",
Description: "trace off",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
plc4x_config.TraceTransactionManagerWorkers = false
return nil
},
@@ -318,7 +319,7 @@ var rootCommand = Command{
{
Name: "on",
Description: "trace on",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
plc4x_config.TraceTransactionManagerTransactions = true
return nil
},
@@ -326,7 +327,7 @@ var rootCommand = Command{
{
Name: "off",
Description: "trace off",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
plc4x_config.TraceTransactionManagerTransactions = false
return nil
},
@@ -340,7 +341,7 @@ var rootCommand = Command{
{
Name: "on",
Description: "trace on",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
plc4x_config.TraceDefaultMessageCodecWorker = true
return nil
},
@@ -348,7 +349,7 @@ var rootCommand = Command{
{
Name: "off",
Description: "trace off",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
plc4x_config.TraceDefaultMessageCodecWorker = false
return nil
},
@@ -362,7 +363,7 @@ var rootCommand = Command{
{
Name: "on",
Description: "debug on",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
plc4xpcapanalyzerLog = zerolog.New(zerolog.ConsoleWriter{Out: tview.ANSIWriter(consoleOutput)})
return nil
},
@@ -370,7 +371,7 @@ var rootCommand = Command{
{
Name: "off",
Description: "debug off",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
plc4xpcapanalyzerLog = zerolog.Nop()
return nil
},
@@ -383,14 +384,14 @@ var rootCommand = Command{
subCommands: []Command{
{
Name: "list",
- action: func(currentCommand Command, argument string) error {
+ action: func(_ context.Context, currentCommand Command, argument string) error {
_, _ = fmt.Fprintf(commandOutput, "Auto-register enabled drivers:\n %s\n", strings.Join(config.AutoRegisterDrivers, "\n "))
return nil
},
},
{
Name: "enable",
- action: func(_ Command, argument string) error {
+ action: func(_ context.Context, _ Command, argument string) error {
return enableAutoRegister(argument)
},
parameterSuggestions: func(currentText string) (entries []string) {
@@ -404,7 +405,7 @@ var rootCommand = Command{
},
{
Name: "disable",
- action: func(_ Command, argument string) error {
+ action: func(_ context.Context, _ Command, argument string) error {
return disableAutoRegister(argument)
},
parameterSuggestions: func(currentText string) (entries []string) {
@@ -423,7 +424,7 @@ var rootCommand = Command{
{
Name: "history",
Description: "outputs the last commands",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
outputCommandHistory()
return nil
},
@@ -431,7 +432,7 @@ var rootCommand = Command{
{
Name: "clear",
Description: "clear all outputs",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
messageOutputClear()
consoleOutputClear()
commandOutputClear()
@@ -441,7 +442,7 @@ var rootCommand = Command{
{
Name: "message",
Description: "clears message output",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
messageOutputClear()
return nil
},
@@ -449,7 +450,7 @@ var rootCommand = Command{
{
Name: "console",
Description: "clears console output",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
consoleOutputClear()
return nil
},
@@ -457,13 +458,23 @@ var rootCommand = Command{
{
Name: "command",
Description: "clears command output",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
commandOutputClear()
return nil
},
},
},
},
+ {
+ Name: "abort",
+ Description: "abort currently running jobs",
+ action: func(_ context.Context, _ Command, _ string) error {
+ for _, cancelFunc := range cancelFunctions {
+ cancelFunc()
+ }
+ return nil
+ },
+ },
},
}
@@ -472,7 +483,7 @@ func init() {
rootCommand.subCommands = append(rootCommand.subCommands, Command{
Name: "help",
Description: "prints out this help",
- action: func(_ Command, _ string) error {
+ action: func(_ context.Context, _ Command, _ string) error {
_, _ = fmt.Fprintf(commandOutput, "[#0000ff]Available commands[white]\n")
rootCommand.visit(0, func(currentIndent int, command Command) {
indentString := strings.Repeat(" ", currentIndent)
@@ -492,7 +503,7 @@ var NotDirectlyExecutable = errors.New("Not directly executable")
type Command struct {
Name string
Description string
- action func(currentCommand Command, argument string) error
+ action func(ctx context.Context, currentCommand Command, argument string) error
subCommands []Command
parameterSuggestions func(currentText string) (entries []string)
}
@@ -572,15 +583,15 @@ func (c Command) hasDirectExecution() bool {
return c.action != nil
}
-func Execute(commandText string) error {
- err := rootCommand.Execute(commandText)
+func Execute(ctx context.Context, commandText string) error {
+ err := rootCommand.Execute(ctx, commandText)
if err == nil {
addCommandHistoryEntry(commandText)
}
return err
}
-func (c Command) Execute(commandText string) error {
+func (c Command) Execute(ctx context.Context, commandText string) error {
plc4xpcapanalyzerLog.Debug().Msgf("%s executes %s", c, commandText)
if !c.acceptsCurrentText(commandText) {
return errors.Errorf("%s doesn't understand %s", c.Name, commandText)
@@ -590,7 +601,7 @@ func (c Command) Execute(commandText string) error {
for _, command := range c.subCommands {
if command.acceptsCurrentText(prepareForSubCommandForSubCommand) {
plc4xpcapanalyzerLog.Debug().Msgf("%s delegates to sub %s", c, command)
- return command.Execute(prepareForSubCommandForSubCommand)
+ return command.Execute(ctx, prepareForSubCommandForSubCommand)
}
}
return errors.Errorf("%s not accepted by any subcommands of %s", commandText, c.Name)
@@ -600,7 +611,7 @@ func (c Command) Execute(commandText string) error {
}
plc4xpcapanalyzerLog.Debug().Msgf("%s executes %s directly", c, commandText)
preparedForParameters := c.prepareForParameters(commandText)
- return c.action(c, preparedForParameters)
+ return c.action(ctx, c, preparedForParameters)
}
}
diff --git a/plc4go/tools/plc4xpcapanalyzer/ui/common.go b/plc4go/tools/plc4xpcapanalyzer/ui/common.go
index 6759d5a3d..09bf5bfbd 100644
--- a/plc4go/tools/plc4xpcapanalyzer/ui/common.go
+++ b/plc4go/tools/plc4xpcapanalyzer/ui/common.go
@@ -20,6 +20,7 @@
package ui
import (
+ "context"
plc4go "github.com/apache/plc4x/plc4go/pkg/api"
plc4goModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi"
@@ -68,6 +69,9 @@ var currentDir = func() string {
return dir
}()
+var rootContext = context.Background()
+var cancelFunctions = make(map[uint32]context.CancelFunc)
+
var shutdownMutex sync.Mutex
var hasShutdown bool
diff --git a/plc4go/tools/plc4xpcapanalyzer/ui/ui.go b/plc4go/tools/plc4xpcapanalyzer/ui/ui.go
index 9dc60b659..ad3bbfe43 100644
--- a/plc4go/tools/plc4xpcapanalyzer/ui/ui.go
+++ b/plc4go/tools/plc4xpcapanalyzer/ui/ui.go
@@ -20,12 +20,14 @@
package ui
import (
+ "context"
"fmt"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/gdamore/tcell/v2"
"github.com/pkg/errors"
"github.com/rivo/tview"
+ "math/rand"
"regexp"
"strconv"
"time"
@@ -167,7 +169,12 @@ func buildCommandArea(newPrimitive func(text string) tview.Primitive, applicatio
}
}
_, _ = fmt.Fprintf(enteredCommandsView, "%s [\"%d\"]%s[\"\"]\n", time.Now().Format("04:05"), commandsExecuted, commandText)
- if err := Execute(commandText); err != nil {
+ ctx, cancelFunc := context.WithCancel(rootContext)
+ randomId := rand.Uint32()
+ cancelFunctions[randomId] = cancelFunc
+ defer delete(cancelFunctions, randomId)
+
+ if err := Execute(ctx, commandText); err != nil {
_, _ = fmt.Fprintf(enteredCommandsView, "[#ff0000]%s %s[white]\n", time.Now().Format("04:05"), err)
return
}