You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lostluck (via GitHub)" <gi...@apache.org> on 2023/01/27 22:26:13 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #25160: [Go SDK]: Implement mongodbio.Read with an SDF

lostluck commented on code in PR #25160:
URL: https://github.com/apache/beam/pull/25160#discussion_r1089492429


##########
sdks/go/pkg/beam/io/mongodbio/read.go:
##########
@@ -442,53 +266,53 @@ func (fn *readFn) ProcessElement(
 	}()
 
 	for cursor.Next(ctx) {
-		value, err := decodeDocument(cursor, fn.Type.T)
+		id, value, err := decodeDocument(cursor, fn.Type.T)
 		if err != nil {
 			return err
 		}
 
-		emit(value)
-	}
-
-	return cursor.Err()
-}
+		result := cursorResult{nextID: id}
+		if !rt.TryClaim(result) {
+			return cursor.Err()
+		}
 
-func mergeFilters(idFilter bson.M, customFilter bson.M) bson.M {
-	if len(idFilter) == 0 {
-		return customFilter
+		emit(value)
 	}
 
-	if len(customFilter) == 0 {
-		return idFilter
-	}
+	result := cursorResult{isExhausted: true}
+	rt.TryClaim(result)

Review Comment:
   I think you're right in this case. 
   
   Formally, the promise around TryClaim is "if TryClaim returns successfully, I will process the claimed position", so if the underlying data/job mapping, changes to invalidate the restriction, it's reasonable to simply discard previously unknown input, since it wasn't originally there to start with. 
   
   As for process continuations, while they *technically* can be used for batch WRT the global window, you're right that support for them is largely for handling streaming, which is how Dataflow interprets transforms that can return process continuations.  It's possible it's a bug in the Go SDKs interpretation however, and it's the combination of a  Process Continuation and an Unbounded Restriction that should lead to a streaming interpretation but that would need to be vetted against existing runners. The documentation around that part of the model for SDK developers is spotty at best unfortunately.
   
   -------
   
   WRT actual splitting behavior, I know Dataflow is currently the only runner that does dynamic splitting. I believe both Flink and Spark will do the initial splitting stages, which amounts to each element will get a split of their initial restriction after creating the restriction.
   
   This is typically fine. Dynamic splitting or Liquid Sharding shows most of it's benefits in avoiding stragglers so that all the work ends closer together, providing better utilization of workers, and faster completion of a job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org