You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by ji...@apache.org on 2022/08/15 03:19:34 UTC

[incubator-pegasus] branch v2.4 updated: fix(go-client): scan will not recover when encounter `error_invalid_state` (#1106)

This is an automated email from the ASF dual-hosted git repository.

jiashuo pushed a commit to branch v2.4
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/v2.4 by this push:
     new 6e6d8eb57 fix(go-client): scan will not recover when encounter `error_invalid_state` (#1106)
6e6d8eb57 is described below

commit 6e6d8eb575b965d5462d0584d0831e59aa7a8650
Author: Jiashuo <js...@live.com>
AuthorDate: Mon Aug 8 15:16:56 2022 +0800

    fix(go-client): scan will not recover when encounter `error_invalid_state` (#1106)
---
 go-client/pegasus/scan_test.go | 78 +++++++++++++++++++++++++++++++++++++-----
 go-client/pegasus/scanner.go   |  8 ++++-
 2 files changed, 76 insertions(+), 10 deletions(-)

diff --git a/go-client/pegasus/scan_test.go b/go-client/pegasus/scan_test.go
index 171abdc99..d89539633 100644
--- a/go-client/pegasus/scan_test.go
+++ b/go-client/pegasus/scan_test.go
@@ -23,6 +23,7 @@ import (
 	"context"
 	"fmt"
 	"reflect"
+	"strings"
 	"sync"
 	"testing"
 	"time"
@@ -219,6 +220,10 @@ func TestPegasusTableConnector_ScanInclusive(t *testing.T) {
 	clearDatabase(t, tb)
 }
 
+func GetScannerRpcErrorForTest(_ *session.ReplicaSession, ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.GetScannerRequest) (*rrdb.ScanResponse, error) {
+	return nil, base.ERR_INVALID_STATE
+}
+
 func ScanRpcErrorForTest(_ *session.ReplicaSession, ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.ScanRequest) (*rrdb.ScanResponse, error) {
 	return nil, base.ERR_INVALID_STATE
 }
@@ -270,28 +275,83 @@ func TestPegasusTableConnector_ScanFailRecover(t *testing.T) {
 	}
 	assert.Equal(t, 1, successCount)
 
+	// test rpc error
 	mockRpcFailedErrorTable, err := client.OpenTable(context.Background(), "temp")
 	assert.Nil(t, err)
 	defer tb.Close()
-	scanner, _ = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
-	rpcFailedMocked := false
+	// test getScanner rpc error
+	scanner, err = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
+	assert.Nil(t, err)
+	rpcGetScannerFailedMocked := false
+	recallGetScanner := true
+	var getScannerFailedMock *gomonkey.Patches
 	successCount = 0
 	for {
 		ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
-		complete, _, _, _, error := scanner.Next(ctx)
-		// mock rpc error, follow request will be recovered automatically
-		if !rpcFailedMocked {
-			mock = gomonkey.ApplyMethod(reflect.TypeOf(session), "Scan", ScanRpcErrorForTest)
-			rpcFailedMocked = true
+		if recallGetScanner && rpcGetScannerFailedMocked { // GetScannerFailedMocked = true, recall GetScanner to trigger the error when execute scanner.Next
+			scanner, err = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
+			assert.Nil(t, err)
+		}
+		complete, _, _, _, errNext := scanner.Next(ctx)
+		if !rpcGetScannerFailedMocked { // mock replicaSession.GetScanner rpc error, the next loop request will be failed
+			getScannerFailedMock = gomonkey.ApplyMethod(reflect.TypeOf(session), "GetScanner", GetScannerRpcErrorForTest)
+			rpcGetScannerFailedMocked = true
+		}
+		cancel()
+		if complete {
+			break
+		}
+
+		if errNext == nil {
+			successCount++
+			continue
+		}
+		// error encounter ERR_INVALID_STATE and auto-trigger re-config that means rpcGetScannerFailedMocked can be reset
+		if strings.Contains(errNext.Error(), "ERR_INVALID_STATE") &&
+			strings.Contains(errNext.Error(), "updateConfig=true") {
+			getScannerFailedMock.Reset()
+			recallGetScanner = false
+		} else if strings.Contains(errNext.Error(), "recover after next loop") {
+			continue
 		} else {
-			mock.Reset()
+			break
+		}
+	}
+	// since re-call once getScanner, so the successCount = 100 + 1
+	assert.Equal(t, 101, successCount)
+
+	// test scan rpc error
+	getScannerFailedMock.Reset()
+	rpcScanFailedMocked := false
+	var scanFailedMock *gomonkey.Patches
+	successCount = 0
+	scanner, err = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
+	assert.Nil(t, err)
+	for {
+		ctx, cancel := context.WithTimeout(context.Background(), time.Second*500)
+		complete, _, _, _, errNext := scanner.Next(ctx)
+		if !rpcScanFailedMocked { // mock scan rpc error, the next loop request will be failed but recovered automatically
+			scanFailedMock = gomonkey.ApplyMethod(reflect.TypeOf(session), "Scan", ScanRpcErrorForTest)
+			rpcScanFailedMocked = true
 		}
 		cancel()
 		if complete {
 			break
 		}
-		if error == nil {
+
+		if errNext == nil {
 			successCount++
+			continue
+		}
+
+		// error encounter ERR_INVALID_STATE and auto-trigger re-config that means rpcGetScannerFailedMocked can be reset
+		if strings.Contains(errNext.Error(), "ERR_INVALID_STATE") &&
+			strings.Contains(errNext.Error(), "updateConfig=true") {
+			scanFailedMock.Reset()
+		} else if strings.Contains(errNext.Error(), "recover after next loop") {
+			continue
+		} else {
+			break
 		}
 	}
 	assert.Equal(t, 100, successCount)
diff --git a/go-client/pegasus/scanner.go b/go-client/pegasus/scanner.go
index d1238c062..968299451 100644
--- a/go-client/pegasus/scanner.go
+++ b/go-client/pegasus/scanner.go
@@ -223,7 +223,13 @@ func (p *pegasusScanner) startScanPartition(ctx context.Context) (completed bool
 
 	part := p.table.getPartitionByGpid(p.curGpid)
 	response, err := part.GetScanner(ctx, p.curGpid, p.curHash, request)
-
+	if err != nil {
+		p.batchStatus = batchRpcError
+		if updateConfig, _, errHandler := p.table.handleReplicaError(err, part); errHandler != nil {
+			err = fmt.Errorf("scan failed, error = %s, try resolve it(updateConfig=%v), result = %s", err, updateConfig, errHandler)
+		}
+		return
+	}
 	err = p.onRecvScanResponse(response, err)
 	if err == nil {
 		return p.doNext(ctx)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org