You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lostluck (via GitHub)" <gi...@apache.org> on 2023/03/28 18:03:30 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

lostluck commented on code in PR #25982:
URL: https://github.com/apache/beam/pull/25982#discussion_r1150984329


##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -574,3 +620,99 @@ func (w *dataWriter) Write(p []byte) (n int, err error) {
 	w.buf = append(w.buf, p...)
 	return len(p), nil
 }
+
+func (c *DataChannel) makeTimerWriter(ctx context.Context, id clientID, family string) *timerWriter {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	var m map[timerKey]*timerWriter
+	var ok bool
+	if m, ok = c.timerWriters[id.instID]; !ok {
+		m = make(map[timerKey]*timerWriter)
+		c.timerWriters[id.instID] = m
+	}
+	tk := timerKey{ptransformID: id.ptransformID, family: family}
+	if w, ok := m[tk]; ok {
+		return w
+	}
+
+	// We don't check for finished instructions for writers, as writers
+	// can only be created if an instruction is in scope, and aren't
+	// runner or user directed.
+
+	w := &timerWriter{ch: c, id: id, timerFamilyID: family}
+	m[tk] = w
+	return w
+}
+
+type timerWriter struct {
+	id            clientID
+	timerFamilyID string
+	ch            *DataChannel
+}
+
+// send requires the ch.mu lock to be held.
+func (w *timerWriter) send(msg *fnpb.Elements) error {
+	recordStreamSend(msg)
+	if err := w.ch.client.Send(msg); err != nil {
+		if err == io.EOF {
+			log.Warnf(context.TODO(), "dataWriter[%v;%v] EOF on send; fetching real error", w.id, w.ch.id)

Review Comment:
   Done!
   
   Basically, if a setting is functionally "permanent" and decided on construction, it's better to have a separate type instead of an if-statement. Fewer ifs == faster execution. See also my long overdue performance work on the DoFn execution hotpath, which is largely blocked until timers are done, so they can be hotpathed if appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org