You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by ji...@apache.org on 2022/08/15 03:19:34 UTC
[incubator-pegasus] branch v2.4 updated: fix(go-client): scan will not recover when encounter `error_invalid_state` (#1106)
This is an automated email from the ASF dual-hosted git repository.
jiashuo pushed a commit to branch v2.4
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/v2.4 by this push:
new 6e6d8eb57 fix(go-client): scan will not recover when encounter `error_invalid_state` (#1106)
6e6d8eb57 is described below
commit 6e6d8eb575b965d5462d0584d0831e59aa7a8650
Author: Jiashuo <js...@live.com>
AuthorDate: Mon Aug 8 15:16:56 2022 +0800
fix(go-client): scan will not recover when encounter `error_invalid_state` (#1106)
---
go-client/pegasus/scan_test.go | 78 +++++++++++++++++++++++++++++++++++++-----
go-client/pegasus/scanner.go | 8 ++++-
2 files changed, 76 insertions(+), 10 deletions(-)
diff --git a/go-client/pegasus/scan_test.go b/go-client/pegasus/scan_test.go
index 171abdc99..d89539633 100644
--- a/go-client/pegasus/scan_test.go
+++ b/go-client/pegasus/scan_test.go
@@ -23,6 +23,7 @@ import (
"context"
"fmt"
"reflect"
+ "strings"
"sync"
"testing"
"time"
@@ -219,6 +220,10 @@ func TestPegasusTableConnector_ScanInclusive(t *testing.T) {
clearDatabase(t, tb)
}
+func GetScannerRpcErrorForTest(_ *session.ReplicaSession, ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.GetScannerRequest) (*rrdb.ScanResponse, error) {
+ return nil, base.ERR_INVALID_STATE
+}
+
func ScanRpcErrorForTest(_ *session.ReplicaSession, ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.ScanRequest) (*rrdb.ScanResponse, error) {
return nil, base.ERR_INVALID_STATE
}
@@ -270,28 +275,83 @@ func TestPegasusTableConnector_ScanFailRecover(t *testing.T) {
}
assert.Equal(t, 1, successCount)
+ // test rpc error
mockRpcFailedErrorTable, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
- scanner, _ = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
- rpcFailedMocked := false
+ // test getScanner rpc error
+ scanner, err = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
+ assert.Nil(t, err)
+ rpcGetScannerFailedMocked := false
+ recallGetScanner := true
+ var getScannerFailedMock *gomonkey.Patches
successCount = 0
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
- complete, _, _, _, error := scanner.Next(ctx)
- // mock rpc error, follow request will be recovered automatically
- if !rpcFailedMocked {
- mock = gomonkey.ApplyMethod(reflect.TypeOf(session), "Scan", ScanRpcErrorForTest)
- rpcFailedMocked = true
+ if recallGetScanner && rpcGetScannerFailedMocked { // GetScannerFailedMocked = true, recall GetScanner to trigger the error when execute scanner.Next
+ scanner, err = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
+ assert.Nil(t, err)
+ }
+ complete, _, _, _, errNext := scanner.Next(ctx)
+ if !rpcGetScannerFailedMocked { // mock replicaSession.GetScanner rpc error, the next loop request will be failed
+ getScannerFailedMock = gomonkey.ApplyMethod(reflect.TypeOf(session), "GetScanner", GetScannerRpcErrorForTest)
+ rpcGetScannerFailedMocked = true
+ }
+ cancel()
+ if complete {
+ break
+ }
+
+ if errNext == nil {
+ successCount++
+ continue
+ }
+ // error encounter ERR_INVALID_STATE and auto-trigger re-config that means rpcGetScannerFailedMocked can be reset
+ if strings.Contains(errNext.Error(), "ERR_INVALID_STATE") &&
+ strings.Contains(errNext.Error(), "updateConfig=true") {
+ getScannerFailedMock.Reset()
+ recallGetScanner = false
+ } else if strings.Contains(errNext.Error(), "recover after next loop") {
+ continue
} else {
- mock.Reset()
+ break
+ }
+ }
+ // since re-call once getScanner, so the successCount = 100 + 1
+ assert.Equal(t, 101, successCount)
+
+ // test scan rpc error
+ getScannerFailedMock.Reset()
+ rpcScanFailedMocked := false
+ var scanFailedMock *gomonkey.Patches
+ successCount = 0
+ scanner, err = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
+ assert.Nil(t, err)
+ for {
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*500)
+ complete, _, _, _, errNext := scanner.Next(ctx)
+ if !rpcScanFailedMocked { // mock scan rpc error, the next loop request will be failed but recovered automatically
+ scanFailedMock = gomonkey.ApplyMethod(reflect.TypeOf(session), "Scan", ScanRpcErrorForTest)
+ rpcScanFailedMocked = true
}
cancel()
if complete {
break
}
- if error == nil {
+
+ if errNext == nil {
successCount++
+ continue
+ }
+
+ // error encounter ERR_INVALID_STATE and auto-trigger re-config that means rpcGetScannerFailedMocked can be reset
+ if strings.Contains(errNext.Error(), "ERR_INVALID_STATE") &&
+ strings.Contains(errNext.Error(), "updateConfig=true") {
+ scanFailedMock.Reset()
+ } else if strings.Contains(errNext.Error(), "recover after next loop") {
+ continue
+ } else {
+ break
}
}
assert.Equal(t, 100, successCount)
diff --git a/go-client/pegasus/scanner.go b/go-client/pegasus/scanner.go
index d1238c062..968299451 100644
--- a/go-client/pegasus/scanner.go
+++ b/go-client/pegasus/scanner.go
@@ -223,7 +223,13 @@ func (p *pegasusScanner) startScanPartition(ctx context.Context) (completed bool
part := p.table.getPartitionByGpid(p.curGpid)
response, err := part.GetScanner(ctx, p.curGpid, p.curHash, request)
-
+ if err != nil {
+ p.batchStatus = batchRpcError
+ if updateConfig, _, errHandler := p.table.handleReplicaError(err, part); errHandler != nil {
+ err = fmt.Errorf("scan failed, error = %s, try resolve it(updateConfig=%v), result = %s", err, updateConfig, errHandler)
+ }
+ return
+ }
err = p.onRecvScanResponse(response, err)
if err == nil {
return p.doNext(ctx)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org