You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "damccorm (via GitHub)" <gi...@apache.org> on 2024/04/19 12:53:55 UTC

[PR] Add ability to load multiple copies of a model across processes [beam]

damccorm opened a new pull request, #31052:
URL: https://github.com/apache/beam/pull/31052

   Sometimes loading one model per process isn't feasible, but you still want multiple models loaded (e.g. if you have a GPU that can hold 3 copies of the model). This gives users the ability to express this.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1576760164


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1519,27 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      models = []
+      for i in range(self._model_handler.model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(

Review Comment:
   do we need to explicitly release the multi-process shared handle for it to be freed-up? Guessing we currently don't do that as per TODO on line 1516?
   
   If we are using the model update functonality via side input , are we reusing the same mutliprocess shared handle during  the update , or will side input update iteration result in creating a new multiprocess shared handle without releasing the previous one (trying to answer this myself atm)? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1575457163


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -795,6 +802,21 @@ def share_model_across_processes(self) -> bool:
       return self._unkeyed.share_model_across_processes()
     return True
 
+  def max_shared_model_copies(self) -> int:
+    if self._single_model:
+      return self._unkeyed.max_shared_model_copies()
+    for mh in self._id_to_mh_map.values():
+      if mh.max_shared_model_copies() != 1:
+        raise ValueError(
+            'KeyedModelHandler cannot map records to multiple '
+            'models if one or more of its ModelHandlers '
+            'require multiple model copies (set via'

Review Comment:
   Done!



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1482,28 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      # TODO - update this to populate a list of models of configurable length

Review Comment:
   Oh that was from development, I just forgot to remove it. Removed



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1518,28 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      # TODO - update this to populate a list of models of configurable length
+      models = []
+      for i in range(self._model_handler.max_shared_model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(
+                load, tag=f'{model_tag}{i}', always_proxy=True).acquire())
+      model_wrapper = _CrossProcessModelWrapper(models, model_tag)
     else:
       model = self._shared_model_handle.acquire(load, tag=model_tag)
+      model_wrapper = _CrossProcessModelWrapper([model], model_tag)

Review Comment:
   I thought it was cleaner to just have a single object here so that consumers don't need to worry about it. `_CrossProcessModelWrapper` should be agnostic to which loading method is used.
   
   Maybe this is just a naming issue - does `_SharedModelWrapper` sound cleaner?



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1518,28 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      # TODO - update this to populate a list of models of configurable length
+      models = []
+      for i in range(self._model_handler.max_shared_model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(
+                load, tag=f'{model_tag}{i}', always_proxy=True).acquire())
+      model_wrapper = _CrossProcessModelWrapper(models, model_tag)
     else:
       model = self._shared_model_handle.acquire(load, tag=model_tag)
+      model_wrapper = _CrossProcessModelWrapper([model], model_tag)

Review Comment:
   I tried this, let me know what you think



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1378,6 +1421,45 @@ def update(
     self._inference_request_batch_byte_size.update(examples_byte_size)
 
 
+class _ModelRoutingStrategy():
+  """A class meant to sit in a shared location for mapping incoming batches to
+  different models. Currently only supports round-robin, but can be extended
+  to support other protocols if needed.
+  """
+  def __init__(self):
+    self._cur_index = 0
+
+  def next_model_index(self, num_models):
+    self._cur_index = (self._cur_index + 1) % num_models
+    return self._cur_index
+
+
+class _CrossProcessModelWrapper():
+  """A router class to map incoming calls to the correct model.
+  
+    This allows us to round robin calls to models sitting in different
+    processes so that we can more efficiently use resources (e.g. GPUs).
+  """
+  def __init__(self, models: List[Any], model_tag: str):
+    self.models = models
+    if len(models) > 0:

Review Comment:
   No, should be `> 1`. Otherwise we don't need this.



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1518,28 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      # TODO - update this to populate a list of models of configurable length
+      models = []
+      for i in range(self._model_handler.max_shared_model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(
+                load, tag=f'{model_tag}{i}', always_proxy=True).acquire())
+      model_wrapper = _CrossProcessModelWrapper(models, model_tag)
     else:
       model = self._shared_model_handle.acquire(load, tag=model_tag)
+      model_wrapper = _CrossProcessModelWrapper([model], model_tag)
     # since shared_model_handle is shared across threads, the model path
     # might not get updated in the model handler
     # because we directly get cached weak ref model from shared cache, instead
     # of calling load(). For sanity check, call update_model_path again.
     if isinstance(side_input_model_path, str):
       self._model_handler.update_model_path(side_input_model_path)
     else:
-      self._model_handler.update_model_paths(self._model, side_input_model_path)
-    return model
+      if self._model is not None:
+        models = self._model.all_models()
+        for m in models:
+          self._model_handler.update_model_paths(m, side_input_model_path)
+    return model_wrapper

Review Comment:
   We actually can now since we're always returning `_CrossProcessModelWrapper` which is a nice improvement. Updated



##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -79,11 +90,15 @@ def __init__(
     self._env_vars = kwargs.get('env_vars', {})
     self._multi_process_shared = multi_process_shared
     self._state = state
+    self._incrementing = incrementing
+    self._max_copies = max_copies
     self._num_bytes_per_element = num_bytes_per_element
 
   def load_model(self):
     if self._fake_clock:
       self._fake_clock.current_time_ns += 500_000_000  # 500ms
+    if self._incrementing:
+      return FakeIncrementingModel()

Review Comment:
   Done!



##########
sdks/python/apache_beam/ml/inference/pytorch_inference.py:
##########
@@ -234,6 +235,9 @@ def __init__(
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded
+        onto your machine. This can be useful if you exactly know your CPU or

Review Comment:
   > Possible wording suggestion:
   
   Updated
   
   > Maybe this should be a ValueError if a user specifies both?
   
   I don't think it should be a ValueError - if you change it to True and you set this param, that is kinda reasonable and a no-op makes sense IMO since we're still honoring your choice.



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1378,6 +1421,45 @@ def update(
     self._inference_request_batch_byte_size.update(examples_byte_size)
 
 
+class _ModelRoutingStrategy():
+  """A class meant to sit in a shared location for mapping incoming batches to
+  different models. Currently only supports round-robin, but can be extended
+  to support other protocols if needed.
+  """
+  def __init__(self):
+    self._cur_index = 0
+
+  def next_model_index(self, num_models):
+    self._cur_index = (self._cur_index + 1) % num_models
+    return self._cur_index
+
+
+class _CrossProcessModelWrapper():
+  """A router class to map incoming calls to the correct model.
+  
+    This allows us to round robin calls to models sitting in different
+    processes so that we can more efficiently use resources (e.g. GPUs).
+  """
+  def __init__(self, models: List[Any], model_tag: str):
+    self.models = models
+    if len(models) > 0:

Review Comment:
   Fixed



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1416,8 +1498,10 @@ def load():
       if isinstance(side_input_model_path, str):
         self._model_handler.update_model_path(side_input_model_path)
       else:
-        self._model_handler.update_model_paths(
-            self._model, side_input_model_path)
+        if self._model is not None:

Review Comment:
   We initially assign it to None, and IIRC there's some update paths where this can get called before it has been assigned. Right now `update_model_paths` is expected to handle (and no-op) the `None` case, so this is at least a bit cleaner



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1575502012


##########
sdks/python/apache_beam/ml/inference/pytorch_inference.py:
##########
@@ -234,6 +235,9 @@ def __init__(
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded
+        onto your machine. This can be useful if you exactly know your CPU or

Review Comment:
   ah my concern was not an incorrect configuration but cognitive burden for users: would they be thinking if they should set only one param, or both in their use case, while in the end it doesn't matter. but now it also seems that  `large_model` becomes redundant as it is equivalent to passing `model_copies = 1`, right?
   
   Possibly except the fact that   using model_copies is currently disallowed with KeyedMH, and large_model might still allow that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1576874610


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -952,6 +977,12 @@ def get_preprocess_fns(self) -> Iterable[Callable[[Any], Any]]:
   def should_skip_batching(self) -> bool:
     return True
 
+  def share_model_across_processes(self) -> bool:

Review Comment:
   This is a good idea. I think we should do it, but agree deferring is a good idea to keep the PR to a single purpose



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1576760164


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1519,27 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      models = []
+      for i in range(self._model_handler.model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(

Review Comment:
   do we need to explicitly release the multi-process shared handle for it to be freed-up? Guessing we currently don't do that as per TODO on line 1516?
   
   If we are using the model update functonality via side input , are we reusing the same mutliprocess shared handle during  the update , or will side input update result in allocating more memory 



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1519,27 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      models = []
+      for i in range(self._model_handler.model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(

Review Comment:
   do we need to explicitly release the multi-process shared handle for it to be freed-up? Guessing we currently don't do that as per TODO on line 1516?
   
   If we are using the model update functonality via side input , are we reusing the same mutliprocess shared handle during  the update , or will side input update iteratation result in allocating more memory 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #31052:
URL: https://github.com/apache/beam/pull/31052#issuecomment-2070807943

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1575314644


##########
sdks/python/apache_beam/ml/inference/huggingface_inference.py:
##########
@@ -257,6 +258,9 @@ def __init__(
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded

Review Comment:
   TODO(self) wording re: becomes a no-op



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1576791498


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1519,27 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      models = []
+      for i in range(self._model_handler.model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(

Review Comment:
   > will side input update iteration result in creating a new multiprocess shared handle without releasing the previous one 
   
   As far as I can tell, we will  call  _RunInferenceDoFn.update_model()  from https://github.com/apache/beam/blob/58bea3f19e5220c08a900a3005e80473561dabaf/sdks/python/apache_beam/ml/inference/base.py#L1525 
   
   and then we will create a new ModelWrapper with a new SharedHandle(s) in: https://github.com/apache/beam/blob/58bea3f19e5220c08a900a3005e80473561dabaf/sdks/python/apache_beam/ml/inference/base.py#L1474
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1576728785


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1416,8 +1498,10 @@ def load():
       if isinstance(side_input_model_path, str):
         self._model_handler.update_model_path(side_input_model_path)
       else:
-        self._model_handler.update_model_paths(
-            self._model, side_input_model_path)
+        if self._model is not None:

Review Comment:
   How about we:
   
   1)  create a _ModelWrapper base class, that is used for a single model single process scenario
   2) _CrossProcessModelWrapper that also supports multiple models that are shared across the process.
   3) Implement  `_ModelWrapper._update_model_path(mh, new_model_path_config)` in both classes inside using the appropriate logic.
   
   that should push some the complexity associated with # of models and  the difference between  update_model_path vs  update_model_paths away from load(). 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1575524505


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -952,6 +977,12 @@ def get_preprocess_fns(self) -> Iterable[Callable[[Any], Any]]:
   def should_skip_batching(self) -> bool:
     return True
 
+  def share_model_across_processes(self) -> bool:

Review Comment:
   (cleanup, can be deferred)
   
   we can leverage reflection here and delegate calls to base via `__getattr__`  like in 
   
   https://github.com/apache/beam/blob/37609ba70fab2216edc338121bf2f3a056a1e490/sdks/python/apache_beam/internal/gcp/auth.py
   
   
   Per https://stackoverflow.com/questions/2405590/how-do-i-override-getattr-without-breaking-the-default-behavior, explicitly defined methods should take priority.
   



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1518,28 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      # TODO - update this to populate a list of models of configurable length
+      models = []
+      for i in range(self._model_handler.max_shared_model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(
+                load, tag=f'{model_tag}{i}', always_proxy=True).acquire())
+      model_wrapper = _CrossProcessModelWrapper(models, model_tag)
     else:
       model = self._shared_model_handle.acquire(load, tag=model_tag)
+      model_wrapper = _CrossProcessModelWrapper([model], model_tag)

Review Comment:
   oh, i actually got confused by the `>=0` condition and thought that you intended  to use the multiprocess-shared regardless of # of models. _SharedModelWrapper is better.



##########
sdks/python/apache_beam/ml/inference/pytorch_inference.py:
##########
@@ -234,6 +235,9 @@ def __init__(
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded
+        onto your machine. This can be useful if you exactly know your CPU or

Review Comment:
   oh my concern was not an incorrect configuration but cognitive burden for users: would they be thinking if they should set only one param, or both in their use case, while in the end it doesn't matter. but now it also seems that  `large_model` becomes redundant as it is equivalent to passing `model_copies = 1`, right?
   
   Possibly except the fact that   using model_copies is currently disallowed with KeyedMH, and large_model might still allow that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1576181499


##########
sdks/python/apache_beam/ml/inference/pytorch_inference.py:
##########
@@ -234,6 +235,9 @@ def __init__(
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded
+        onto your machine. This can be useful if you exactly know your CPU or

Review Comment:
   > but now it also seems that large_model becomes redundant as it is equivalent to passing model_copies = 1, right?
   
   That's right, though I think long term I would like for us to do smart things here (e.g. `large_model` becomes "pack as many models as you can fit). There's some conversation on this general idea in the design doc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1576728785


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1416,8 +1498,10 @@ def load():
       if isinstance(side_input_model_path, str):
         self._model_handler.update_model_path(side_input_model_path)
       else:
-        self._model_handler.update_model_paths(
-            self._model, side_input_model_path)
+        if self._model is not None:

Review Comment:
   How about we:
   
   1)  create a _ModelWrapper or base class, that is used for a single model single process scenario
   2) _CrossProcessModelWrapper that also supports multiple models that are shared across the process.
   3) Implement  `_ModelWrapper._update_model_path(mh, new_model_path_config)` in both classes inside using the appropriate logic.
   
   that should push some the complexity associated with # of models and  the difference between  update_model_path vs  update_model_paths away from load(). 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1576760164


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1519,27 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      models = []
+      for i in range(self._model_handler.model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(

Review Comment:
   do we need to explicitly release the multi-process shared handle for it to be freed-up? Guessing we currently don't do that as per TODO on line 1516?
   
   If we are using the update via side input mechanism, are we reusing the same shared handle, or will side input update result in allocating more memory 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1576757948


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1416,8 +1498,10 @@ def load():
       if isinstance(side_input_model_path, str):
         self._model_handler.update_model_path(side_input_model_path)
       else:
-        self._model_handler.update_model_paths(
-            self._model, side_input_model_path)
+        if self._model is not None:

Review Comment:
   just noticed that updating code is wrapped inside another `load()` helper function so maybe that suggestion won't work since at that time we don't have the wrapper yet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on PR #31052:
URL: https://github.com/apache/beam/pull/31052#issuecomment-2070804228

   R: @tvalentyn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1576791498


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1519,27 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      models = []
+      for i in range(self._model_handler.model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(

Review Comment:
   > will side input update iteration result in creating a new multiprocess shared handle without releasing the previous one 
   
   As far as I can tell, we will recreate new handles, since we will  call  _RunInferenceDoFn.update_model()  from https://github.com/apache/beam/blob/58bea3f19e5220c08a900a3005e80473561dabaf/sdks/python/apache_beam/ml/inference/base.py#L1525 
   
   and then we will create a new ModelWrapper with a new SharedHandle(s) in: https://github.com/apache/beam/blob/58bea3f19e5220c08a900a3005e80473561dabaf/sdks/python/apache_beam/ml/inference/base.py#L1474
   
   Sounds like this would be a concern without either reusing handles or releasing them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm merged PR #31052:
URL: https://github.com/apache/beam/pull/31052


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1575378402


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -795,6 +802,21 @@ def share_model_across_processes(self) -> bool:
       return self._unkeyed.share_model_across_processes()
     return True
 
+  def max_shared_model_copies(self) -> int:
+    if self._single_model:
+      return self._unkeyed.max_shared_model_copies()
+    for mh in self._id_to_mh_map.values():
+      if mh.max_shared_model_copies() != 1:
+        raise ValueError(
+            'KeyedModelHandler cannot map records to multiple '
+            'models if one or more of its ModelHandlers '
+            'require multiple model copies (set via'

Review Comment:
   ```suggestion
               'require multiple model copies (set via '
   ```



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1482,28 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      # TODO - update this to populate a list of models of configurable length

Review Comment:
   Not sure I follow the TODO. Should we link a GH issue and include necessary details there?



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1518,28 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      # TODO - update this to populate a list of models of configurable length
+      models = []
+      for i in range(self._model_handler.max_shared_model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(
+                load, tag=f'{model_tag}{i}', always_proxy=True).acquire())
+      model_wrapper = _CrossProcessModelWrapper(models, model_tag)
     else:
       model = self._shared_model_handle.acquire(load, tag=model_tag)
+      model_wrapper = _CrossProcessModelWrapper([model], model_tag)
     # since shared_model_handle is shared across threads, the model path
     # might not get updated in the model handler
     # because we directly get cached weak ref model from shared cache, instead
     # of calling load(). For sanity check, call update_model_path again.
     if isinstance(side_input_model_path, str):
       self._model_handler.update_model_path(side_input_model_path)
     else:
-      self._model_handler.update_model_paths(self._model, side_input_model_path)
-    return model
+      if self._model is not None:
+        models = self._model.all_models()
+        for m in models:
+          self._model_handler.update_model_paths(m, side_input_model_path)
+    return model_wrapper

Review Comment:
   Can we have a typehint for what  _load_model returns or at least describe the return type in a docstring?



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1378,6 +1421,45 @@ def update(
     self._inference_request_batch_byte_size.update(examples_byte_size)
 
 
+class _ModelRoutingStrategy():
+  """A class meant to sit in a shared location for mapping incoming batches to
+  different models. Currently only supports round-robin, but can be extended
+  to support other protocols if needed.
+  """
+  def __init__(self):
+    self._cur_index = 0
+
+  def next_model_index(self, num_models):
+    self._cur_index = (self._cur_index + 1) % num_models
+    return self._cur_index
+
+
+class _CrossProcessModelWrapper():
+  """A router class to map incoming calls to the correct model.
+  
+    This allows us to round robin calls to models sitting in different
+    processes so that we can more efficiently use resources (e.g. GPUs).
+  """
+  def __init__(self, models: List[Any], model_tag: str):
+    self.models = models
+    if len(models) > 0:

Review Comment:
   should this be an `assert` ?
   



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1416,8 +1498,10 @@ def load():
       if isinstance(side_input_model_path, str):
         self._model_handler.update_model_path(side_input_model_path)
       else:
-        self._model_handler.update_model_paths(
-            self._model, side_input_model_path)
+        if self._model is not None:

Review Comment:
   what is the typehint for self._model ? Can we specify it in the constructor where we assign it? in which case can it be None?



##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -79,11 +90,15 @@ def __init__(
     self._env_vars = kwargs.get('env_vars', {})
     self._multi_process_shared = multi_process_shared
     self._state = state
+    self._incrementing = incrementing
+    self._max_copies = max_copies
     self._num_bytes_per_element = num_bytes_per_element
 
   def load_model(self):
     if self._fake_clock:
       self._fake_clock.current_time_ns += 500_000_000  # 500ms
+    if self._incrementing:
+      return FakeIncrementingModel()

Review Comment:
   let's add an assertion that  incrementing and stateful settings are not used at the same time



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -315,6 +315,13 @@ def share_model_across_processes(self) -> bool:
     https://beam.apache.org/releases/pydoc/current/apache_beam.utils.multi_process_shared.html"""
     return False
 
+  def max_shared_model_copies(self) -> int:

Review Comment:
   possible alternative names: model_copies, model_copies_to_load (asked a related comment  in the doc)



##########
sdks/python/apache_beam/ml/inference/huggingface_inference.py:
##########
@@ -257,6 +258,9 @@ def __init__(
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded

Review Comment:
   TODO(self) wording re: becomes a no-op



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -795,6 +802,21 @@ def share_model_across_processes(self) -> bool:
       return self._unkeyed.share_model_across_processes()
     return True
 
+  def max_shared_model_copies(self) -> int:
+    if self._single_model:
+      return self._unkeyed.max_shared_model_copies()
+    for mh in self._id_to_mh_map.values():
+      if mh.max_shared_model_copies() != 1:
+        raise ValueError(
+            'KeyedModelHandler cannot map records to multiple '
+            'models if one or more of its ModelHandlers '
+            'require multiple model copies (set via'
+            'max_shared_model_copies). To fix, verify that each '

Review Comment:
   ```suggestion
               'model_copies). To fix, verify that each '
   ```



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1518,28 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      # TODO - update this to populate a list of models of configurable length
+      models = []
+      for i in range(self._model_handler.max_shared_model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(
+                load, tag=f'{model_tag}{i}', always_proxy=True).acquire())
+      model_wrapper = _CrossProcessModelWrapper(models, model_tag)
     else:
       model = self._shared_model_handle.acquire(load, tag=model_tag)
+      model_wrapper = _CrossProcessModelWrapper([model], model_tag)

Review Comment:
   Why do we need  _CrossProcessModelWrapper when share_model_across_processes is not used? should we use some single-process model wrapper stub instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1575452537


##########
sdks/python/apache_beam/ml/inference/pytorch_inference.py:
##########
@@ -234,6 +235,9 @@ def __init__(
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded
+        onto your machine. This can be useful if you exactly know your CPU or

Review Comment:
   Possible wording suggestion:
   
   This can be useful if you exactly know your CPU or GPU capacity and want to maximize resource utilization. 
   
   > If set, large_model becomes a no-op.
   
   Maybe this should be a ValueError if a user specifies both?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "tvalentyn (via GitHub)" <gi...@apache.org>.
tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1576791498


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1519,27 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      models = []
+      for i in range(self._model_handler.model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(

Review Comment:
   > will side input update iteration result in creating a new multiprocess shared handle without releasing the previous one 
   
   As far as I can tell, we will  recreate a new model_wrapper after calling _RunInferenceDoFn.update_model()  from https://github.com/apache/beam/blob/58bea3f19e5220c08a900a3005e80473561dabaf/sdks/python/apache_beam/ml/inference/base.py#L1525 
   
   and then we will create a new ModelWrapper with a new SharedHandle(s) in: https://github.com/apache/beam/blob/58bea3f19e5220c08a900a3005e80473561dabaf/sdks/python/apache_beam/ml/inference/base.py#L1474
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add ability to load multiple copies of a model across processes [beam]

Posted by "damccorm (via GitHub)" <gi...@apache.org>.
damccorm commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1576843804


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1519,27 @@ def load():
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      models = []
+      for i in range(self._model_handler.model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(

Review Comment:
   > do we need to explicitly release the multi-process shared handle for it to be freed-up? Guessing we currently don't do that as per TODO on line 1516?
   
   We don't do that, I think it is fine for now though because we'd only want to unload models in batch mode when we transition to a new stage (when cleanup should happen anyways)
   
   > If we are using the model update functonality via side input , are we reusing the same mutliprocess shared handle during the update , or will side input update iteration result in creating a new multiprocess shared handle without releasing the previous one (trying to answer this myself atm)?
   
   We'll create a new one, but with the same tag. This will result in the current one being garbage collected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org