You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "AnandInguva (via GitHub)" <gi...@apache.org> on 2023/07/20 15:19:56 UTC

[GitHub] [beam] AnandInguva commented on a diff in pull request #27544: Group transformed and non transformed elements using hash in MLTransform TFTProcessHandler

AnandInguva commented on code in PR #27544:
URL: https://github.com/apache/beam/pull/27544#discussion_r1269592648


##########
sdks/python/apache_beam/ml/transforms/handlers.py:
##########
@@ -84,14 +85,15 @@ def process(
       self,
       element,
   ):
+    hash_key, element = element

Review Comment:
   This is for internal use.  I will add a comment about backwards compatibility.



##########
sdks/python/apache_beam/ml/transforms/handlers.py:
##########
@@ -369,22 +428,35 @@ def process_data(
     # whether a scalar value or list or np array is passed as input,
     #  we will convert scalar values to list values and TFT will ouput
     # numpy array all the time.
-    raw_data |= beam.ParDo(ConvertScalarValuesToListValues())
+
+    keyed_raw_data = (raw_data | beam.ParDo(ComputeAndAttachHashKey()))
+
+    feature_set = [feature.name for feature in raw_data_metadata.schema.feature]
+    columns_not_in_schema_with_hash = (
+        keyed_raw_data
+        | beam.ParDo(GetMissingColumnsPColl(feature_set)))
+
+    keyed_raw_data = keyed_raw_data | beam.ParDo(
+        ConvertScalarValuesToListValues())
+
+    raw_data_list = (keyed_raw_data | beam.ParDo(MakeHashKeyAsColumn()))
 
     with tft_beam.Context(temp_dir=self.artifact_location):
-      data = (raw_data, raw_data_metadata)
+      data = (raw_data_list, raw_data_metadata)
       if self.artifact_mode == ArtifactMode.PRODUCE:
         transform_fn = (
             data
             | "AnalyzeDataset" >> tft_beam.AnalyzeDataset(self.process_data_fn))

Review Comment:
   I am not sure. I will test this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org