You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "riteshghorse (via GitHub)" <gi...@apache.org> on 2023/03/28 16:19:53 UTC

[GitHub] [beam] riteshghorse commented on a diff in pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

riteshghorse commented on code in PR #25982:
URL: https://github.com/apache/beam/pull/25982#discussion_r1150751691


##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -214,51 +290,88 @@ func (c *DataChannel) terminateStreamOnError(err error) {
 	}
 }
 
-// OpenRead returns an io.ReadCloser of the data elements for the given instruction and ptransform.
-func (c *DataChannel) OpenRead(ctx context.Context, ptransformID string, instID instructionID) io.ReadCloser {
+// OpenWrite returns an io.WriteCloser of the data elements for the given instruction and ptransform.
+func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, instID instructionID) io.WriteCloser {
+	return c.makeWriter(ctx, clientID{ptransformID: ptransformID, instID: instID})
+}
+
+// OpenElementChan returns a channel of typex.Elements for the given instruction and ptransform.
+func (c *DataChannel) OpenElementChan(ctx context.Context, ptransformID string, instID instructionID, expectedTimerTransforms []string) (<-chan exec.Elements, error) {
 	c.mu.Lock()
 	defer c.mu.Unlock()
 	cid := clientID{ptransformID: ptransformID, instID: instID}
 	if c.readErr != nil {
-		log.Errorf(ctx, "opening a reader %v on a closed channel", cid)
-		return &errReader{c.readErr}
+		return nil, fmt.Errorf("opening a reader %v on a closed channel. Original error: %w", cid, c.readErr)
 	}
-	return c.makeReader(ctx, cid)
+	return c.makeChannel(true, cid, expectedTimerTransforms...).ch, nil
 }
 
-// OpenWrite returns an io.WriteCloser of the data elements for the given instruction and ptransform.
-func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, instID instructionID) io.WriteCloser {
-	return c.makeWriter(ctx, clientID{ptransformID: ptransformID, instID: instID})
+// makeChannel creates a channel of exec.Elements. It expects to be called while c.mu is held.
+func (c *DataChannel) makeChannel(fromSource bool, id clientID, additionalTransforms ...string) *elementsChan {
+	if ec, ok := c.channels[id.instID]; ok {
+		ec.mu.Lock()
+		defer ec.mu.Unlock()
+		if fromSource {
+			ec.want = (1 + int32(len(additionalTransforms)))

Review Comment:
   The `+1` is because of 1 incremented by source?



##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -574,3 +620,99 @@ func (w *dataWriter) Write(p []byte) (n int, err error) {
 	w.buf = append(w.buf, p...)
 	return len(p), nil
 }
+
+func (c *DataChannel) makeTimerWriter(ctx context.Context, id clientID, family string) *timerWriter {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	var m map[timerKey]*timerWriter
+	var ok bool
+	if m, ok = c.timerWriters[id.instID]; !ok {
+		m = make(map[timerKey]*timerWriter)
+		c.timerWriters[id.instID] = m
+	}
+	tk := timerKey{ptransformID: id.ptransformID, family: family}
+	if w, ok := m[tk]; ok {
+		return w
+	}
+
+	// We don't check for finished instructions for writers, as writers
+	// can only be created if an instruction is in scope, and aren't
+	// runner or user directed.
+
+	w := &timerWriter{ch: c, id: id, timerFamilyID: family}
+	m[tk] = w
+	return w
+}
+
+type timerWriter struct {
+	id            clientID
+	timerFamilyID string
+	ch            *DataChannel
+}
+
+// send requires the ch.mu lock to be held.
+func (w *timerWriter) send(msg *fnpb.Elements) error {
+	recordStreamSend(msg)
+	if err := w.ch.client.Send(msg); err != nil {
+		if err == io.EOF {
+			log.Warnf(context.TODO(), "dataWriter[%v;%v] EOF on send; fetching real error", w.id, w.ch.id)
+			err = nil
+			for err == nil {
+				// Per GRPC stream documentation, if there's an EOF, we must call Recv
+				// until a non-nil error is returned, to ensure resources are cleaned up.
+				// https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
+				_, err = w.ch.client.Recv()
+			}
+		}
+		log.Warnf(context.TODO(), "dataWriter[%v;%v] error on send: %v", w.id, w.ch.id, err)

Review Comment:
   ```suggestion
   		log.Warnf(context.TODO(), "timerWriter[%v;%v] error on send: %v", w.id, w.ch.id, err)
   ```



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -94,20 +98,79 @@ func (n *DataSource) Up(ctx context.Context) error {
 // StartBundle initializes this datasource for the bundle.
 func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContext) error {
 	n.mu.Lock()
+	n.curInst = id
 	n.source = data.Data
 	n.state = data.State
 	n.start = time.Now()
-	n.index = -1
+	n.index = 0
 	n.splitIdx = math.MaxInt64
 	n.mu.Unlock()
 	return n.Out.StartBundle(ctx, id, data)
 }
 
+// splitSuccess is a marker error to indicate we've reached the split index.
+// Akin to io.EOF.
+var splitSuccess = errors.New("split index reached")
+
+// process handles converting elements from the data source to timers.
+//
+// The data and timer callback functions must return an io.EOF if the reader terminates to signal that an additional
+// buffer is desired. On successful splits, [splitSuccess] must be returned to indicate that the
+// PTransform is done processing data for this instruction.
+func (n *DataSource) process(ctx context.Context, data func(bcr *byteCountReader, ptransformID string) error, timer func(bcr *byteCountReader, ptransformID, timerFamilyID string) error) error {

Review Comment:
   Nice, I like this approach of passing in data and timer handling functions



##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -574,3 +620,99 @@ func (w *dataWriter) Write(p []byte) (n int, err error) {
 	w.buf = append(w.buf, p...)
 	return len(p), nil
 }
+
+func (c *DataChannel) makeTimerWriter(ctx context.Context, id clientID, family string) *timerWriter {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	var m map[timerKey]*timerWriter
+	var ok bool
+	if m, ok = c.timerWriters[id.instID]; !ok {
+		m = make(map[timerKey]*timerWriter)
+		c.timerWriters[id.instID] = m
+	}
+	tk := timerKey{ptransformID: id.ptransformID, family: family}
+	if w, ok := m[tk]; ok {
+		return w
+	}
+
+	// We don't check for finished instructions for writers, as writers
+	// can only be created if an instruction is in scope, and aren't
+	// runner or user directed.
+
+	w := &timerWriter{ch: c, id: id, timerFamilyID: family}
+	m[tk] = w
+	return w
+}
+
+type timerWriter struct {
+	id            clientID
+	timerFamilyID string
+	ch            *DataChannel
+}
+
+// send requires the ch.mu lock to be held.
+func (w *timerWriter) send(msg *fnpb.Elements) error {
+	recordStreamSend(msg)
+	if err := w.ch.client.Send(msg); err != nil {
+		if err == io.EOF {
+			log.Warnf(context.TODO(), "dataWriter[%v;%v] EOF on send; fetching real error", w.id, w.ch.id)

Review Comment:
   ```suggestion
   			log.Warnf(context.TODO(), "timerWriter[%v;%v] EOF on send; fetching real error", w.id, w.ch.id)
   ```



##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -124,28 +137,37 @@ func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataCha
 	return ch, nil
 }
 
-func (m *DataChannelManager) closeInstruction(instID instructionID) {
+func (m *DataChannelManager) closeInstruction(instID instructionID, ports []exec.Port) error {
 	m.mu.Lock()
 	defer m.mu.Unlock()
-	for _, ch := range m.ports {
-		ch.removeInstruction(instID)
+	var firstNonNilError error
+	for _, port := range ports {
+		ch, ok := m.ports[port.URL]
+		if !ok {
+			continue
+		}
+		err := ch.removeInstruction(instID)
+		if err != nil && firstNonNilError == nil {

Review Comment:
   So we are closing all ports here first and then returning the caught error. Nice. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org