You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/14 16:04:11 UTC

[GitHub] [beam] lostluck opened a new pull request #11413: [BEAM-9746] check for 0 length copies from state

lostluck opened a new pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413
 
 
   @thetorpedodog discovered that side inputs that produce empty pcollections would produce a spurious "zero element". The root cause was because the SDK's handling of state api responses didn't check for 0 length byte buffers from the API, and interpret them as the EOF signal.
   
   In particular for elements whose coded form is length prefixed, this would yield a 0 length read for a request of 1 byte when calling graph.DecodeVarInt, but no error, preventing subsequent error handling. This would tell the decoder that the encoded size is 0, and it would then read that data accordingly.
   
   Now when there are 0 length copies and there are no more pages of data, the Read call will immeadiately return EOF rather than defer to the next call, which in the empty case, never comes.
   
   While it's unlikely that runners would ever intentionally return mid stream 0 length reads while saying there is subsequent data, we can't simply end the paging on empty buffers. It's inline with Go's io.Reader interface semantics to permit 0 length reads without error, as like in any short read case, data could be available later, instead of waiting for the full length of the read, and require multiple reads. 
   
   It's probable that there are places we should be handling the [io.Reader](https://pkg.go.dev/io?tab=doc#Reader) contract more rigorously, such as by always handling the non-zero read bytes even when an error is returned, but care would need to be taken during such a retrofit.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on issue #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
lostluck commented on issue #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#issuecomment-613535998
 
 
   R: @thetorpedodog @youngoli 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408468620
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				for i := 0; i < len(test.buflens); i++ {
+					token := []byte(strconv.Itoa(i))
+					var buf []byte
+					if test.buflens[i] >= 0 {
+						buf = bytes.Repeat([]byte{42}, test.buflens[i])
+					}
+					// On the last request response pair, send no token.
+					if i+1 == len(test.buflens) {
+						token = nil
+					}
 
 Review comment:
   Great suggestions. Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] thetorpedodog commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
thetorpedodog commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408478965
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				for i := 0; i < len(test.buflens); i++ {
+					token := []byte(strconv.Itoa(i))
+					var buf []byte
+					if test.buflens[i] >= 0 {
+						buf = bytes.Repeat([]byte{42}, test.buflens[i])
+					}
+					// On the last request response pair, send no token.
+					if i+1 == len(test.buflens) {
+						token = nil
+					}
+
+					req := <-ch.requests
+
+					if test.noGet {
+						ch.responses[req.Id] <- &fnpb.StateResponse{
+							Id: req.Id,
+						}
+						return
+					}
+
+					ch.responses[req.Id] <- &fnpb.StateResponse{
+						Id: req.Id,
+						Response: &fnpb.StateResponse_Get{
+							Get: &fnpb.StateGetResponse{
+								ContinuationToken: token,
+								Data:              buf,
+							},
+						},
+					}
+				}
+			}()
+
+			r := stateKeyReader{
+				ch: ch,
+			}
+
+			if test.closed {
+				err := r.Close()
+				if err != nil {
+					t.Errorf("unexpected error on Close(), got %v", err)
+				}
+			}
+
+			var totalBytes int
+			for _, l := range test.buflens {
+				if l > 0 {
+					totalBytes += l
+				}
+			}
+			var finalerr error
+			var count, reads int
+
+			// Read all the bytes.
+			for count <= totalBytes {
+				reads++
+				b := make([]byte, readLen) // io.Read is keyed off of length, not capacity.
+				n, err := r.Read(b)
+				if err != nil {
+					finalerr = err
+					break
+				}
+				count += n
+				// Special check to avoid spurious zero elements.
+				if count == totalBytes && n == 0 {
+					t.Error("expected byte count read, last read is 0, but no EOF")
+				}
+			}
+			if got, want := reads, test.numReads; got != want {
 
 Review comment:
   I'm still not a huge fan, because it introduces another layer of indirection around what is gotten vs. wanted (e.g. in the `==`/`!=` itself and in the error message string)…

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on issue #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
lostluck commented on issue #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#issuecomment-613614431
 
 
   @thetorpedodog Beam policy permits only one beam expert being involved in a review as either the author or the reviewer. I fill that role for this PR.  I will be waiting for @youngoli 's review though before merging so they can continue gaining expertise in the SDK internals, so there will be 2 of us.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] thetorpedodog commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
thetorpedodog commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408403890
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				for i := 0; i < len(test.buflens); i++ {
+					token := []byte(strconv.Itoa(i))
+					var buf []byte
+					if test.buflens[i] >= 0 {
+						buf = bytes.Repeat([]byte{42}, test.buflens[i])
+					}
+					// On the last request response pair, send no token.
+					if i+1 == len(test.buflens) {
+						token = nil
+					}
 
 Review comment:
   rearranging this so that all the initialization for this variable is together might make it easier to follow:
   
   ```go
   var buf []byte
   if ... {
     ...
   }
   
   // I can see it making sense to reverse the if-statement here to match the one above,
   // but it there is a case to keep it as-is so that the common case is outside the 'if'.
   // Up to you.
   var token []byte
   if (not last entry) {
     // Maybe use fmt.Sprint() here rather than Itoa to avoid needing the strconv package?
     token = []byte(fmt.Sprint(i))
   }
   
   ch.responses[...] <- ...
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck merged pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
lostluck merged pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] thetorpedodog commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
thetorpedodog commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408479609
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +260,166 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				if test.noGet {
+					req := <-ch.requests
+					ch.responses[req.Id] <- &fnpb.StateResponse{
+						Id: req.Id,
+					}
+					return
+				}
+				for i, buflen := range test.buflens {
+					var buf []byte
+					if buflen >= 0 {
+						buf = bytes.Repeat([]byte{42}, buflen)
+					}
+					token := []byte(fmt.Sprint(i))
+					if i+1 == len(test.buflens) {
+						// On the last request response pair, send no token.
+						token = nil
+					}
+					req := <-ch.requests
+
+					ch.responses[req.Id] <- &fnpb.StateResponse{
+						Id: req.Id,
+						Response: &fnpb.StateResponse_Get{
+							Get: &fnpb.StateGetResponse{
+								ContinuationToken: token,
+								Data:              buf,
+							},
+						},
+					}
+				}
+			}()
+
+			r := stateKeyReader{
+				ch: ch,
+			}
+
+			if test.closed {
+				err := r.Close()
+				if err != nil {
+					t.Errorf("unexpected error on Close(), got %v", err)
+				}
+			}
+
+			var totalBytes int
+			for _, l := range test.buflens {
+				if l > 0 {
+					totalBytes += l
+				}
+			}
+			var finalerr error
+			var count, reads int
+
+			// Read all the bytes.
+			for count <= totalBytes {
+				reads++
+				b := make([]byte, readLen) // io.Read is keyed off of length, not capacity.
+				n, err := r.Read(b)
+				if err != nil {
+					finalerr = err
+					break
+				}
+				count += n
+				// Special check to avoid spurious zero elements.
+				if count == totalBytes && n == 0 {
+					t.Error("expected byte count read, last read is 0, but no EOF")
+				}
+			}
+			if got, want := reads, test.numReads; got != want {
+				t.Errorf("read %d times, want %d", got, want)
+			}
+			if got, want := count, totalBytes; got != want {
+				t.Errorf("read %v bytes, want %v", got, want)
+			}
+			if test.closed {
+				if got, want := finalerr, errors.New("side input closed"); !contains(got, want) {
 
 Review comment:
   ...for instance, here, I would find
   
   ```go
   if want := errors.New("side input closed"); !contains(finalErr, want) {
     t.Errorf("got err %v; want to contain %v, finalErr, want)
   }
   ```
   
   easier to read, since the thing you Got is right there. That being said, I wouldn't block submission on this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408468603
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				for i := 0; i < len(test.buflens); i++ {
+					token := []byte(strconv.Itoa(i))
+					var buf []byte
+					if test.buflens[i] >= 0 {
+						buf = bytes.Repeat([]byte{42}, test.buflens[i])
+					}
+					// On the last request response pair, send no token.
+					if i+1 == len(test.buflens) {
+						token = nil
+					}
+
+					req := <-ch.requests
+
+					if test.noGet {
+						ch.responses[req.Id] <- &fnpb.StateResponse{
+							Id: req.Id,
+						}
+						return
+					}
+
+					ch.responses[req.Id] <- &fnpb.StateResponse{
+						Id: req.Id,
+						Response: &fnpb.StateResponse_Get{
+							Get: &fnpb.StateGetResponse{
+								ContinuationToken: token,
+								Data:              buf,
+							},
+						},
+					}
+				}
+			}()
+
+			r := stateKeyReader{
+				ch: ch,
+			}
+
+			if test.closed {
+				err := r.Close()
+				if err != nil {
+					t.Errorf("unexpected error on Close(), got %v", err)
+				}
+			}
+
+			var totalBytes int
+			for _, l := range test.buflens {
+				if l > 0 {
+					totalBytes += l
+				}
+			}
+			var finalerr error
+			var count, reads int
+
+			// Read all the bytes.
+			for count <= totalBytes {
+				reads++
+				b := make([]byte, readLen) // io.Read is keyed off of length, not capacity.
+				n, err := r.Read(b)
+				if err != nil {
+					finalerr = err
+					break
+				}
+				count += n
+				// Special check to avoid spurious zero elements.
+				if count == totalBytes && n == 0 {
+					t.Error("expected byte count read, last read is 0, but no EOF")
+				}
+			}
+			if got, want := reads, test.numReads; got != want {
 
 Review comment:
   No, it's my own personal habit after making mistakes on the ordering one to many times when debugging new tests. Since I've been working on beam for a bit, it's wherever I've been working on it.
   This way I never need to think of which one is 'got' or 'want', it's entirely unambiguous. It also simplifies copying the boiler plate for many sequential checks, where such leftover mistakes are common, as the variables only need to be updated in a single place, in the if definition scope.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408481474
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +260,166 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				if test.noGet {
+					req := <-ch.requests
+					ch.responses[req.Id] <- &fnpb.StateResponse{
+						Id: req.Id,
+					}
+					return
+				}
+				for i, buflen := range test.buflens {
+					var buf []byte
+					if buflen >= 0 {
+						buf = bytes.Repeat([]byte{42}, buflen)
+					}
+					token := []byte(fmt.Sprint(i))
+					if i+1 == len(test.buflens) {
+						// On the last request response pair, send no token.
+						token = nil
+					}
+					req := <-ch.requests
+
+					ch.responses[req.Id] <- &fnpb.StateResponse{
+						Id: req.Id,
+						Response: &fnpb.StateResponse_Get{
+							Get: &fnpb.StateGetResponse{
+								ContinuationToken: token,
+								Data:              buf,
+							},
+						},
+					}
+				}
+			}()
+
+			r := stateKeyReader{
+				ch: ch,
+			}
+
+			if test.closed {
+				err := r.Close()
+				if err != nil {
+					t.Errorf("unexpected error on Close(), got %v", err)
+				}
+			}
+
+			var totalBytes int
+			for _, l := range test.buflens {
+				if l > 0 {
+					totalBytes += l
+				}
+			}
+			var finalerr error
+			var count, reads int
+
+			// Read all the bytes.
+			for count <= totalBytes {
+				reads++
+				b := make([]byte, readLen) // io.Read is keyed off of length, not capacity.
+				n, err := r.Read(b)
+				if err != nil {
+					finalerr = err
+					break
+				}
+				count += n
+				// Special check to avoid spurious zero elements.
+				if count == totalBytes && n == 0 {
+					t.Error("expected byte count read, last read is 0, but no EOF")
+				}
+			}
+			if got, want := reads, test.numReads; got != want {
+				t.Errorf("read %d times, want %d", got, want)
+			}
+			if got, want := count, totalBytes; got != want {
+				t.Errorf("read %v bytes, want %v", got, want)
+			}
+			if test.closed {
+				if got, want := finalerr, errors.New("side input closed"); !contains(got, want) {
 
 Review comment:
   Ack. Not an unreasonable position. I just know I stopped making the trivial but easy to overlook mistakes when I started using this pattern.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] thetorpedodog commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
thetorpedodog commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408401030
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				for i := 0; i < len(test.buflens); i++ {
+					token := []byte(strconv.Itoa(i))
+					var buf []byte
+					if test.buflens[i] >= 0 {
+						buf = bytes.Repeat([]byte{42}, test.buflens[i])
+					}
+					// On the last request response pair, send no token.
+					if i+1 == len(test.buflens) {
+						token = nil
+					}
+
+					req := <-ch.requests
+
+					if test.noGet {
+						ch.responses[req.Id] <- &fnpb.StateResponse{
+							Id: req.Id,
+						}
+						return
+					}
 
 Review comment:
   Since this doesn't depend on the rest of what is going on in the loop, pulling this up to the top might be a good idea.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408403428
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				for i := 0; i < len(test.buflens); i++ {
 
 Review comment:
   Good catch!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408507507
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
 
 Review comment:
   Consider: What kind of change to the stateKeyReader implementation would change the expected number of reads, based on how stateKeyReader gets its data?
   
   There are two kinds of unit test. What you've described is black box testing, where the testing code has no knowledge of internal details of the implementation. This is valuable when there are accessible API calls to set the unknown internal state of a struct, and in doing so, there are checkable expectations of how other API calls will behave as a result. As a rule, this is preferable, as it's resilient to implementation changes, but with a broad enough API surface, it can be very very expensive since it's simply checking the same code paths over and over and over again on different tests. Black box testing by necessity ends up having alot of redundancy. We do have black box testing in general for Beam Go, since we use the direct runner to validate large portions of the exec package in a "pipeline" context. You can find them via any of the test files that have their package name to include _test.
   
   There's also white box testing, where the internal details are known to the test implementer. These tests are an example of the latter. 
   
   There is no user side API to configure the internal state of stateKeyReader.Read(), and in general for the io.Reader interface, which is how users (AKA other parts of the beam framework), will be using stateKeyReader. 
   Since we can't configure things with a user side API, we must manipulate some other internal state, to set the initial conditions, and then from those conditions, check that we understand the code under test.
   
   Further, white box testing is the only reasonable option for such a general API like Read(). It gets passed in a buffer, and it's expected to write up to len(buffer) bytes to it, and tell you what it did. It doesn't say anything about the bytes themselves. The implementations vary dramatically depending on the purpose, and only the implementation can check that. The API in this case doesn't dictate the tests, otherwise there could be a "io.ReaderTester" that we could instead of running these.
   
   So, while numReads in general is inscrutable to the user, it's not inscrutable to us, the implementers of this test case, and the code. We have our own expectations for the code
   
   In fact, numReads a deterministic number based on the lengths of the backing buffers vs the lengths of the reads. Since it's something we can check, and know based on the initial conditions, and our understanding of the code, it's fine. numReads is a second order expectation based on the initial conditions, but it's also one that's very easy to check.
   
   To be very blunt, numReads is checking that the code behaves the way we think it behaves.
   
   In this case, had we been testing that the "nil" data buffer case required 1 read to return EOF, we wouldn't have had the bug in question.
   
   Your underlying concern about a "change state" test is valid though. Tests that require a specific implementation in order to pass, can be a nuisance. But that doesn't apply so much to second order expectations like numReads.  If the number of reads changes when the implementation changes, I certainly would like to know about it, because that could be an *unexpected* side effect of the change being made. Conversely, if a refactorer believes they can change the implementation to reduce the number of calls to Read the test can expect, they can simply adjust the number, and in Test Driven Development style, use that to validate that the changes they're making accomplish the intended goal.
   
   I'm on the side of I'd rather have tests fail if a dramatic change in behavior occurs, and the change author need to adjust them since then it verifies that they ran the tests, and that the tests are actually working. Having tests that pass after you make a change is a double edged sword. Your code is either still satisfying the old expectations, or it's breaking in ways you aren't testing yet.
   
   To answer the question I posed at the top of this comment. I can think of one change I'd make to the implementation (I mention it in the top post for this PR), but to make it, we'd need to validate every use of the io.Reader interface in the Go SDK, but it would only affect one or two of the given test cases for the numReads value, not all of them.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408477833
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				for i, buflen := range test.buflens {
+					token := []byte(strconv.Itoa(i))
+					var buf []byte
+					if buflen >= 0 {
+						buf = bytes.Repeat([]byte{42}, buflen)
+					}
+					// On the last request response pair, send no token.
+					if i+1 == len(test.buflens) {
+						token = nil
+					}
+
+					req := <-ch.requests
+
+					if test.noGet {
+						ch.responses[req.Id] <- &fnpb.StateResponse{
+							Id: req.Id,
+						}
+						return
+					}
+
+					ch.responses[req.Id] <- &fnpb.StateResponse{
+						Id: req.Id,
+						Response: &fnpb.StateResponse_Get{
+							Get: &fnpb.StateGetResponse{
+								ContinuationToken: token,
+								Data:              buf,
+							},
+						},
+					}
+				}
+			}()
+
+			r := stateKeyReader{
+				ch: ch,
+			}
+
+			if test.closed {
+				err := r.Close()
+				if err != nil {
+					t.Errorf("unexpected error on Close(), got %v", err)
+				}
+			}
+
+			var totalBytes int
+			for _, l := range test.buflens {
+				if l > 0 {
+					totalBytes += l
+				}
+			}
+			var finalerr error
+			var count, reads int
+
+			// Read all the bytes.
+			for count <= totalBytes {
+				reads++
+				b := make([]byte, readLen) // io.Read is keyed off of length, not capacity.
+				n, err := r.Read(b)
+				if err != nil {
+					finalerr = err
+					break
+				}
+				count += n
+				// Special check to avoid spurious zero elements.
+				if count == totalBytes && n == 0 {
+					t.Error("expected byte count read, last read is 0, but no EOF")
+				}
+			}
+			if got, want := reads, test.numReads; got != want {
+				t.Errorf("read %d times, want %d", got, want)
+			}
+			if got, want := count, totalBytes; got != want {
+				t.Errorf("read %v bytes, want %v", got, want)
+			}
+			if test.closed {
+				if got, want := finalerr, errors.New("side input closed"); !contains(got, want) {
+					t.Errorf("got err %v, want to contain %v", got, want)
+				}
+				return
+			}
+			if got, want := finalerr, io.EOF; got != want {
+				t.Errorf("got err %v, want %v", got, want)
 
 Review comment:
   Nit: Same as above. Could use some way to make it more explicit where the quoted error begins and ends.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408354342
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/datamgr.go
 ##########
 @@ -369,11 +371,14 @@ func (r *dataReader) Read(buf []byte) (int, error) {
 		r.cur = b
 	}
 
+	// We don't need to check for a 0 length copy from r.cur here, since that's
+	// checked before buffers are handed to the r.buf channel.
 	n := copy(buf, r.cur)
 
-	if len(r.cur) == n {
+	switch {
 
 Review comment:
   It could but I like the consistency in handling between the statemgr and datamgr code, hence the no-op/comment only change.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408509078
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				for i, buflen := range test.buflens {
+					token := []byte(strconv.Itoa(i))
+					var buf []byte
+					if buflen >= 0 {
+						buf = bytes.Repeat([]byte{42}, buflen)
+					}
+					// On the last request response pair, send no token.
+					if i+1 == len(test.buflens) {
+						token = nil
+					}
+
+					req := <-ch.requests
+
+					if test.noGet {
+						ch.responses[req.Id] <- &fnpb.StateResponse{
+							Id: req.Id,
+						}
+						return
+					}
+
+					ch.responses[req.Id] <- &fnpb.StateResponse{
+						Id: req.Id,
+						Response: &fnpb.StateResponse_Get{
+							Get: &fnpb.StateGetResponse{
+								ContinuationToken: token,
+								Data:              buf,
+							},
+						},
+					}
+				}
+			}()
+
+			r := stateKeyReader{
+				ch: ch,
+			}
+
+			if test.closed {
+				err := r.Close()
+				if err != nil {
+					t.Errorf("unexpected error on Close(), got %v", err)
+				}
+			}
+
+			var totalBytes int
+			for _, l := range test.buflens {
+				if l > 0 {
+					totalBytes += l
+				}
+			}
+			var finalerr error
+			var count, reads int
+
+			// Read all the bytes.
+			for count <= totalBytes {
+				reads++
+				b := make([]byte, readLen) // io.Read is keyed off of length, not capacity.
+				n, err := r.Read(b)
+				if err != nil {
+					finalerr = err
+					break
+				}
+				count += n
+				// Special check to avoid spurious zero elements.
+				if count == totalBytes && n == 0 {
+					t.Error("expected byte count read, last read is 0, but no EOF")
+				}
+			}
+			if got, want := reads, test.numReads; got != want {
+				t.Errorf("read %d times, want %d", got, want)
+			}
+			if got, want := count, totalBytes; got != want {
+				t.Errorf("read %v bytes, want %v", got, want)
+			}
+			if test.closed {
+				if got, want := finalerr, errors.New("side input closed"); !contains(got, want) {
+					t.Errorf("got err %v, want to contain %v", got, want)
 
 Review comment:
   I don't like adding newlines to the test formatted output, since it messes with the indentation of the results, possibly making it harder to read. I have used %q which will quote and escape the Error() strings though.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408477556
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				for i, buflen := range test.buflens {
+					token := []byte(strconv.Itoa(i))
+					var buf []byte
+					if buflen >= 0 {
+						buf = bytes.Repeat([]byte{42}, buflen)
+					}
+					// On the last request response pair, send no token.
+					if i+1 == len(test.buflens) {
+						token = nil
+					}
+
+					req := <-ch.requests
+
+					if test.noGet {
+						ch.responses[req.Id] <- &fnpb.StateResponse{
+							Id: req.Id,
+						}
+						return
+					}
+
+					ch.responses[req.Id] <- &fnpb.StateResponse{
+						Id: req.Id,
+						Response: &fnpb.StateResponse_Get{
+							Get: &fnpb.StateGetResponse{
+								ContinuationToken: token,
+								Data:              buf,
+							},
+						},
+					}
+				}
+			}()
+
+			r := stateKeyReader{
+				ch: ch,
+			}
+
+			if test.closed {
+				err := r.Close()
+				if err != nil {
+					t.Errorf("unexpected error on Close(), got %v", err)
+				}
+			}
+
+			var totalBytes int
+			for _, l := range test.buflens {
+				if l > 0 {
+					totalBytes += l
+				}
+			}
+			var finalerr error
+			var count, reads int
+
+			// Read all the bytes.
+			for count <= totalBytes {
+				reads++
+				b := make([]byte, readLen) // io.Read is keyed off of length, not capacity.
+				n, err := r.Read(b)
+				if err != nil {
+					finalerr = err
+					break
+				}
+				count += n
+				// Special check to avoid spurious zero elements.
+				if count == totalBytes && n == 0 {
+					t.Error("expected byte count read, last read is 0, but no EOF")
+				}
+			}
+			if got, want := reads, test.numReads; got != want {
+				t.Errorf("read %d times, want %d", got, want)
+			}
+			if got, want := count, totalBytes; got != want {
+				t.Errorf("read %v bytes, want %v", got, want)
+			}
+			if test.closed {
+				if got, want := finalerr, errors.New("side input closed"); !contains(got, want) {
+					t.Errorf("got err %v, want to contain %v", got, want)
 
 Review comment:
   Nit: This error message could benefit from some escaped quotation marks around the %v, to make it explicit where the quoted error begins and ends. Alternatively, surrounding the quoted error with some newlines and tabs might work well too, like so:
   
   ```
   got err:
       <error message>
   want to contain:
       <error message>
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] youngoli commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408481881
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
 
 Review comment:
   Is the number of reads something that needs to be predictable to whatever code uses the reader? To me it seems like one of those implementation details that doesn't need to be unit tested because it's invisible to the caller.
   
   The reason this stood out to me is that the `numReads` in all the test cases below seem a bit inscrutable, and it would cause these tests to fail if the implementation of the `stateKeyReader` was changed slightly, which is just an annoyance unless it would actually break code.
   
   On the other hand, if the number of reads _is_ a detail that users should know, and correctness does rely on it staying consistent, that seems like it should be explicitly documented on `stateKeyReader.Read`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408509101
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				for i, buflen := range test.buflens {
+					token := []byte(strconv.Itoa(i))
+					var buf []byte
+					if buflen >= 0 {
+						buf = bytes.Repeat([]byte{42}, buflen)
+					}
+					// On the last request response pair, send no token.
+					if i+1 == len(test.buflens) {
+						token = nil
+					}
+
+					req := <-ch.requests
+
+					if test.noGet {
+						ch.responses[req.Id] <- &fnpb.StateResponse{
+							Id: req.Id,
+						}
+						return
+					}
+
+					ch.responses[req.Id] <- &fnpb.StateResponse{
+						Id: req.Id,
+						Response: &fnpb.StateResponse_Get{
+							Get: &fnpb.StateGetResponse{
+								ContinuationToken: token,
+								Data:              buf,
+							},
+						},
+					}
+				}
+			}()
+
+			r := stateKeyReader{
+				ch: ch,
+			}
+
+			if test.closed {
+				err := r.Close()
+				if err != nil {
+					t.Errorf("unexpected error on Close(), got %v", err)
+				}
+			}
+
+			var totalBytes int
+			for _, l := range test.buflens {
+				if l > 0 {
+					totalBytes += l
+				}
+			}
+			var finalerr error
+			var count, reads int
+
+			// Read all the bytes.
+			for count <= totalBytes {
+				reads++
+				b := make([]byte, readLen) // io.Read is keyed off of length, not capacity.
+				n, err := r.Read(b)
+				if err != nil {
+					finalerr = err
+					break
+				}
+				count += n
+				// Special check to avoid spurious zero elements.
+				if count == totalBytes && n == 0 {
+					t.Error("expected byte count read, last read is 0, but no EOF")
+				}
+			}
+			if got, want := reads, test.numReads; got != want {
+				t.Errorf("read %d times, want %d", got, want)
+			}
+			if got, want := count, totalBytes; got != want {
+				t.Errorf("read %v bytes, want %v", got, want)
+			}
+			if test.closed {
+				if got, want := finalerr, errors.New("side input closed"); !contains(got, want) {
+					t.Errorf("got err %v, want to contain %v", got, want)
+				}
+				return
+			}
+			if got, want := finalerr, io.EOF; got != want {
+				t.Errorf("got err %v, want %v", got, want)
 
 Review comment:
   Ack.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408468636
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				for i := 0; i < len(test.buflens); i++ {
+					token := []byte(strconv.Itoa(i))
+					var buf []byte
+					if test.buflens[i] >= 0 {
+						buf = bytes.Repeat([]byte{42}, test.buflens[i])
+					}
+					// On the last request response pair, send no token.
+					if i+1 == len(test.buflens) {
+						token = nil
+					}
+
+					req := <-ch.requests
+
+					if test.noGet {
+						ch.responses[req.Id] <- &fnpb.StateResponse{
+							Id: req.Id,
+						}
+						return
+					}
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] thetorpedodog commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
thetorpedodog commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408412639
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				for i := 0; i < len(test.buflens); i++ {
+					token := []byte(strconv.Itoa(i))
+					var buf []byte
+					if test.buflens[i] >= 0 {
+						buf = bytes.Repeat([]byte{42}, test.buflens[i])
+					}
+					// On the last request response pair, send no token.
+					if i+1 == len(test.buflens) {
+						token = nil
+					}
+
+					req := <-ch.requests
+
+					if test.noGet {
+						ch.responses[req.Id] <- &fnpb.StateResponse{
+							Id: req.Id,
+						}
+						return
+					}
+
+					ch.responses[req.Id] <- &fnpb.StateResponse{
+						Id: req.Id,
+						Response: &fnpb.StateResponse_Get{
+							Get: &fnpb.StateGetResponse{
+								ContinuationToken: token,
+								Data:              buf,
+							},
+						},
+					}
+				}
+			}()
+
+			r := stateKeyReader{
+				ch: ch,
+			}
+
+			if test.closed {
+				err := r.Close()
+				if err != nil {
+					t.Errorf("unexpected error on Close(), got %v", err)
+				}
+			}
+
+			var totalBytes int
+			for _, l := range test.buflens {
+				if l > 0 {
+					totalBytes += l
+				}
+			}
+			var finalerr error
+			var count, reads int
+
+			// Read all the bytes.
+			for count <= totalBytes {
+				reads++
+				b := make([]byte, readLen) // io.Read is keyed off of length, not capacity.
+				n, err := r.Read(b)
+				if err != nil {
+					finalerr = err
+					break
+				}
+				count += n
+				// Special check to avoid spurious zero elements.
+				if count == totalBytes && n == 0 {
+					t.Error("expected byte count read, last read is 0, but no EOF")
+				}
+			}
+			if got, want := reads, test.numReads; got != want {
 
 Review comment:
   Is this a common idiom in beam testing? Inside Google, I don't think I've seen reassignment to want/got for things which are already stored in their own variables. I would expect something like:
   
   ```go
   if reads != test.numReads {
     t.Errorf("read %d times, want %d", reads, test.numReads)
   }
   ```
   
   for simpler tests I have seen things like:
   
   ```go
   if got := sut.CallSomeFn(params); got != tc.want {
     // ...
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] thetorpedodog commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
thetorpedodog commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408397803
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/statemgr_test.go
 ##########
 @@ -258,6 +261,167 @@ func TestStateChannel(t *testing.T) {
 	}
 }
 
+// TestStateKeyReader validates ordinary Read cases
+func TestStateKeyReader(t *testing.T) {
+	const readLen = 4
+	tests := []struct {
+		name     string
+		buflens  []int // sizes of the buffers received on the state channel.
+		numReads int
+		closed   bool // tries to read from closed reader
+		noGet    bool // tries to read from nil get response reader
+	}{
+		{
+			name:     "emptyData",
+			buflens:  []int{-1},
+			numReads: 1,
+		}, {
+			name:     "singleBufferSingleRead",
+			buflens:  []int{readLen},
+			numReads: 2,
+		}, {
+			name:     "singleBufferMultipleReads",
+			buflens:  []int{2 * readLen},
+			numReads: 3,
+		}, {
+			name:     "singleBufferShortRead",
+			buflens:  []int{readLen - 1},
+			numReads: 2,
+		}, {
+			name:     "multiBuffer",
+			buflens:  []int{readLen, readLen},
+			numReads: 3,
+		}, {
+			name:     "multiBuffer-short-reads",
+			buflens:  []int{readLen - 1, readLen - 1, readLen - 2},
+			numReads: 4,
+		}, {
+			name:     "emptyDataFirst", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{-1, readLen, readLen},
+			numReads: 4,
+		}, {
+			name:     "emptyDataMid", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1, readLen},
+			numReads: 5,
+		}, {
+			name:     "emptyDataLast", // Shouldn't happen, but not unreasonable to handle.
+			buflens:  []int{readLen, readLen, -1},
+			numReads: 3,
+		}, {
+			name:     "emptyDataLast-short",
+			buflens:  []int{3*readLen - 2, -1},
+			numReads: 4,
+		}, {
+			name:     "closed",
+			buflens:  []int{-1, -1},
+			numReads: 1,
+			closed:   true,
+		}, {
+			name:     "noGet",
+			buflens:  []int{-1},
+			numReads: 1,
+			noGet:    true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			ctx, cancelFn := context.WithCancel(context.Background())
+			ch := &StateChannel{
+				id:        "test",
+				requests:  make(chan *fnpb.StateRequest),
+				responses: make(map[string]chan<- *fnpb.StateResponse),
+				cancelFn:  cancelFn,
+				DoneCh:    ctx.Done(),
+			}
+
+			// Handle the channel behavior asynchronously.
+			go func() {
+				for i := 0; i < len(test.buflens); i++ {
 
 Review comment:
   i, buflen := range test.buflens ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] thetorpedodog commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state

Posted by GitBox <gi...@apache.org>.
thetorpedodog commented on a change in pull request #11413: [BEAM-9746] check for 0 length copies from state
URL: https://github.com/apache/beam/pull/11413#discussion_r408284011
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/datamgr.go
 ##########
 @@ -369,11 +371,14 @@ func (r *dataReader) Read(buf []byte) (int, error) {
 		r.cur = b
 	}
 
+	// We don't need to check for a 0 length copy from r.cur here, since that's
+	// checked before buffers are handed to the r.buf channel.
 	n := copy(buf, r.cur)
 
-	if len(r.cur) == n {
+	switch {
 
 Review comment:
   seems like this could still be an if-else?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services