You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/19 01:40:01 UTC

[GitHub] [beam] seungjin-an opened a new pull request, #24276: Samza runner support for same stateId with multiple ParDos

seungjin-an opened a new pull request, #24276:
URL: https://github.com/apache/beam/pull/24276

   Samza runner support for same stateId with multiple ParDos
   (1) Do not touch existing RocksDB Store Id with stateId used only by one ParDo
   (2) Introduce new mapping of user-provided stateIds to store ID for those stateIds used in multiple ParDos
   
   // stateId used in a single ParDo continue to use the same stateId as storeId
   .apply("First stateful ParDo", ParDo.of(fn)); 
   → “foo"
   
   // stateId used in multiple ParDos use a new mapping to store Id with PTransform name
   .apply("Second stateful ParDo", ParDo.of(fn2)); 
   → “foo-Second_stateful_ParDo”
   
   - ConfigContext/TranslationContext keeps track of all stateIds and their PTransform names used in all PTransform nodes as we traverse them topologically. 
   - Can't find out which stateId is used in multiple ParDo(s) until all PTransform nodes are traversed. Thus, after traversal is complete, re-write store related configs for stateId that was first added with original mapping (eg. rewrite from "foo" --> "foo-First_stateful_ParDo")
   
   (3) Re-enable ParDoTest in :runners:samza build.gradle
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool commented on pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on PR #24276:
URL: https://github.com/apache/beam/pull/24276#issuecomment-1335793383

   Thanks for fixing the problem for #1 in my previous comments. However, seems some of the necessary changes are also reverted for #2. I expect passing a mapping {stateId -> storeId} (currently it's a set of stateId) to the ```SamzaStoreStateInternals.createStateInternalsFactory()```. Then createStateInternalsFactory() can use that information to create the ```stores``` map.  Somehow this logic is missing. It was present in your previous code. Please take a look and run ./gradlew :runners:samza:build to verify the tests pass. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1037946626


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java:
##########
@@ -148,7 +149,8 @@ public DoFnOp(
       JobInfo jobInfo,
       Map<String, TupleTag<?>> idToTupleTagMap,
       DoFnSchemaInformation doFnSchemaInformation,
-      Map<?, PCollectionView<?>> sideInputMapping) {
+      Map<?, PCollectionView<?>> sideInputMapping,
+      Map<String, String> userStateIds) {

Review Comment:
   As this is no longer needed, I reverted this diff but definitely agree. Thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on a diff in pull request #24276: Samza runner support for non unique stateId across multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1041567079


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java:
##########
@@ -377,17 +392,15 @@ public Map<String, String> createConfig(
     if (signature.usesState()) {
       // set up user state configs
       for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) {
-        final String storeId = state.id();
-
-        // TODO: remove validation after we support same state id in different ParDo.
-        if (!ctx.addStateId(storeId)) {
-          throw new IllegalStateException(
-              "Duplicate StateId " + storeId + " found in multiple ParDo.");
+        final String stateId = state.id();
+        String storeId = stateId;

Review Comment:
   Awesome, changes are made.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] dxichen commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
dxichen commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1034101967


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java:
##########
@@ -105,6 +106,38 @@ public static void createConfig(
     pipeline.traverseTopologically(visitor);
   }
 
+  /** Rewrite user store configs if there exists same state ids with multiple ParDos. */
+  public static void rewriteConfigWithMultiParDoStateId(
+      SamzaPipelineOptions options,
+      Map<String, String> multiParDoStateIdMap,
+      ConfigBuilder configBuilder) {
+    multiParDoStateIdMap.forEach(
+        (stateId, value) -> {
+          // rewrite single parDo state configs into multiple parDo state
+          String multiParDoStateId = String.join("-", stateId, value);
+          // replace old single parDo store configs with new storeId mapping appended with parDo
+          // name
+          configBuilder.remove("stores." + stateId + ".factory");
+          configBuilder.remove("stores." + stateId + ".key.serde");
+          configBuilder.remove("stores." + stateId + ".msg.serde");
+          configBuilder.remove("stores." + stateId + ".rocksdb.compression");
+          // put new config with multi pardo config
+          configBuilder.put(
+              "stores." + multiParDoStateId + ".factory",
+              "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");

Review Comment:
   could we avoid using the classname since we are already including the package?



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -140,9 +141,21 @@ public SamzaPipelineResult run(Pipeline pipeline) {
     LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph);
 
     final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    /* Map of stateId to sanitized (remove whitespace and replace '-' with '_') PTransform name, used in multiple ParDos

Review Comment:
   nit: use /**



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -149,37 +139,39 @@ static <K> Factory<K> createStateInternalsFactory(
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
       ExecutableStage executableStage) {
-
-    Set<String> stateIds =
+    // TODO: handle same stateIds in multiple ParDos for portable mode
+    Map<String, String> stateIds =
         executableStage.getUserStates().stream()
             .map(UserStateReference::localName)
-            .collect(Collectors.toSet());
+            .collect(Collectors.toMap(Function.identity(), Function.identity()));

Review Comment:
   Nit: could directly collect and remove the map step



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -140,9 +141,21 @@ public SamzaPipelineResult run(Pipeline pipeline) {
     LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph);
 
     final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    /* Map of stateId to sanitized (remove whitespace and replace '-' with '_') PTransform name, used in multiple ParDos

Review Comment:
   lets move this explanation to `rewriteConfigWithMultiPardoStateId`'s javadoc



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java:
##########
@@ -35,12 +34,17 @@ public class ConfigContext {
   private final Map<PValue, String> idMap;
   private AppliedPTransform<?, ?, ?> currentTransform;
   private final SamzaPipelineOptions options;
-  private final Set<String> stateIds;
+  private final Map<String, String> usedStateIdMap;
+  private final Map<String, String> multiParDoStateIdMap;

Review Comment:
   Lets rename this for clarity stateIdsToRewrite the value could also be the new state id. 



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -129,18 +130,7 @@ static KeyValueStore<ByteArray, StateValue<?>> getBeamStore(TaskContext context)
    */
   static <K> Factory<K> createNonKeyedStateInternalsFactory(
       String id, TaskContext context, SamzaPipelineOptions pipelineOptions) {
-    return createStateInternalsFactory(id, null, context, pipelineOptions, Collections.emptySet());
-  }
-
-  static <K> Factory<K> createStateInternalsFactory(

Review Comment:
   Why did we remove creating for DoFnSignature?



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -249,6 +252,19 @@ public String getTransformId() {
     return idGenerator.getId(getTransformFullName());
   }
 
+  /** Given a set of user stateIds and parDo name, return a stateId to storeId map. */
+  public Map<String, String> getStateIdToStoreIdMap(Set<String> stateIds, String escapedParDoName) {
+    final Map<String, String> storeIds = new HashMap<>();
+    stateIds.forEach(
+        stateId ->
+            storeIds.put(
+                stateId,
+                multiParDoStateIds.contains(stateId)
+                    ? String.join("-", stateId, escapedParDoName)

Review Comment:
   Let's create a helper method for this join, looks like it happens in a lot of place in the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool commented on pull request #24276: Samza runner support for non unique stateId across multiple ParDos

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on PR #24276:
URL: https://github.com/apache/beam/pull/24276#issuecomment-1341325903

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on a diff in pull request #24276: Samza runner support for non unique stateId across multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1041567428


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java:
##########
@@ -161,6 +163,17 @@ private static <InT, OutT> void doTranslate(
     Map<String, PCollectionView<?>> sideInputMapping =
         ParDoTranslation.getSideInputMapping(ctx.getCurrentTransform());
 
+    final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+    final Map<String, String> stateIdToStoreMapping = new HashMap<>();
+    for (String stateId : signature.stateDeclarations().keySet()) {
+      String storeId = stateId;

Review Comment:
   Done.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java:
##########
@@ -35,12 +34,13 @@ public class ConfigContext {
   private final Map<PValue, String> idMap;
   private AppliedPTransform<?, ?, ?> currentTransform;
   private final SamzaPipelineOptions options;
-  private final Set<String> stateIds;
+  private final Set<String> nonUniqueStateIds;

Review Comment:
   Done.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+public class StoreIdUtils {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1037465724


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java:
##########
@@ -105,6 +106,38 @@ public static void createConfig(
     pipeline.traverseTopologically(visitor);
   }
 
+  /** Rewrite user store configs if there exists same state ids with multiple ParDos. */
+  public static void rewriteConfigWithMultiParDoStateId(
+      SamzaPipelineOptions options,
+      Map<String, String> multiParDoStateIdMap,
+      ConfigBuilder configBuilder) {
+    multiParDoStateIdMap.forEach(
+        (stateId, value) -> {
+          // rewrite single parDo state configs into multiple parDo state
+          String multiParDoStateId = String.join("-", stateId, value);
+          // replace old single parDo store configs with new storeId mapping appended with parDo
+          // name
+          configBuilder.remove("stores." + stateId + ".factory");
+          configBuilder.remove("stores." + stateId + ".key.serde");
+          configBuilder.remove("stores." + stateId + ".msg.serde");
+          configBuilder.remove("stores." + stateId + ".rocksdb.compression");
+          // put new config with multi pardo config
+          configBuilder.put(
+              "stores." + multiParDoStateId + ".factory",
+              "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");

Review Comment:
   Ah Sounds good :) Made the change to all other occurrences. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1037689857


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java:
##########
@@ -148,7 +149,8 @@ public DoFnOp(
       JobInfo jobInfo,
       Map<String, TupleTag<?>> idToTupleTagMap,
       DoFnSchemaInformation doFnSchemaInformation,
-      Map<?, PCollectionView<?>> sideInputMapping) {
+      Map<?, PCollectionView<?>> sideInputMapping,
+      Map<String, String> userStateIds) {

Review Comment:
   stateIdToStoreMapping? A meaningful name will help understand your logic.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -140,9 +141,14 @@ public SamzaPipelineResult run(Pipeline pipeline) {
     LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph);
 
     final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    final Map<String, String> multiParDoStateIdMap = new HashMap<>();

Review Comment:
   The current impl of invoking the createConfig() to populate this map and then have another rewrite() to remove some configs and add new configs is quite awkward. The more elegant impl should be following the existing practice of building the viewToId map, do a pre-scan of the pipeline to create the immutable nonUniqueStateIds set. I expect the code looks like the following:
   
   // DO a pre-scan of the pipeline to build this map, and it stays immutable afterwards.
   final Map<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
   // Create the store configs in one shot, no rewriting businsess
   SamzaPipelineTranslator.createConfig(
           pipeline, options, idMap, nonUniqueStateIds, configBuilder);
   



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -149,37 +139,39 @@ static <K> Factory<K> createStateInternalsFactory(
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
       ExecutableStage executableStage) {
-
-    Set<String> stateIds =
+    // TODO: handle same stateIds in multiple ParDos for portable mode
+    Map<String, String> stateIds =
         executableStage.getUserStates().stream()
-            .map(UserStateReference::localName)
-            .collect(Collectors.toSet());
+            .collect(
+                Collectors.toMap(UserStateReference::localName, UserStateReference::localName));
 
     return createStateInternalsFactory(id, keyCoder, context, pipelineOptions, stateIds);
   }
 
   @SuppressWarnings("unchecked")
-  private static <K> Factory<K> createStateInternalsFactory(
+  static <K> Factory<K> createStateInternalsFactory(
       String id,
       @Nullable Coder<K> keyCoder,
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
-      Collection<String> stateIds) {
+      Map<String, String> stateIdMap) {
     final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
     final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores = new HashMap<>();
     stores.put(BEAM_STORE, getBeamStore(context));
-
     final Coder<K> stateKeyCoder;
     if (keyCoder != null) {
-      stateIds.forEach(
-          stateId ->
-              stores.put(
-                  stateId, (KeyValueStore<ByteArray, StateValue<?>>) context.getStore(stateId)));
+      stateIdMap

Review Comment:
   We don't need this stateIdMap. We should still keep the stores map to be <stateId -> store>. There will be no duplicate ids within a single DoFn.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -99,13 +96,16 @@
 
   // the stores include both beamStore for system states as well as stores for user state
   private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+  // Map of non-system stateIds to unique storeId
+  private final Map<String, String> stateIdMap;

Review Comment:
   then change the name to exactly in your comments: stateIdToStoreMap.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -99,13 +96,16 @@
 
   // the stores include both beamStore for system states as well as stores for user state
   private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+  // Map of non-system stateIds to unique storeId
+  private final Map<String, String> stateIdMap;
   private final K key;
   private final byte[] keyBytes;
   private final int batchGetSize;
   private final String stageId;
 
   private SamzaStoreStateInternals(
       Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
+      Map<String, String> stateIds,

Review Comment:
   We don't need this.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -92,14 +90,14 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> create(
       DoFnSchemaInformation doFnSchemaInformation,
       Map<String, PCollectionView<?>> sideInputMapping,
       OpEmitter emitter,
-      FutureCollector futureCollector) {
+      FutureCollector futureCollector,
+      Map<String, String> userStateIds) {

Review Comment:
   Same above, better naming.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -271,16 +263,19 @@ private static ByteArrayOutputStream getThreadLocalBaos() {
   public static class Factory<K> implements StateInternalsFactory<K> {
     private final String stageId;
     private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+    private final Map<String, String> stateIdMap;

Review Comment:
   remove.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -149,37 +139,39 @@ static <K> Factory<K> createStateInternalsFactory(
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
       ExecutableStage executableStage) {
-
-    Set<String> stateIds =
+    // TODO: handle same stateIds in multiple ParDos for portable mode
+    Map<String, String> stateIds =
         executableStage.getUserStates().stream()
-            .map(UserStateReference::localName)
-            .collect(Collectors.toSet());
+            .collect(
+                Collectors.toMap(UserStateReference::localName, UserStateReference::localName));
 
     return createStateInternalsFactory(id, keyCoder, context, pipelineOptions, stateIds);
   }
 
   @SuppressWarnings("unchecked")
-  private static <K> Factory<K> createStateInternalsFactory(
+  static <K> Factory<K> createStateInternalsFactory(
       String id,
       @Nullable Coder<K> keyCoder,
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
-      Collection<String> stateIds) {
+      Map<String, String> stateIdMap) {
     final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
     final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores = new HashMap<>();
     stores.put(BEAM_STORE, getBeamStore(context));
-
     final Coder<K> stateKeyCoder;
     if (keyCoder != null) {
-      stateIds.forEach(
-          stateId ->
-              stores.put(
-                  stateId, (KeyValueStore<ByteArray, StateValue<?>>) context.getStore(stateId)));
+      stateIdMap
+          .values()
+          .forEach(
+              storeId ->
+                  stores.put(

Review Comment:
   wrong. Keep the stores map being the same {stateId -> store}.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -99,13 +96,16 @@
 
   // the stores include both beamStore for system states as well as stores for user state
   private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+  // Map of non-system stateIds to unique storeId
+  private final Map<String, String> stateIdMap;
   private final K key;
   private final byte[] keyBytes;
   private final int batchGetSize;
   private final String stageId;
 
   private SamzaStoreStateInternals(
       Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,

Review Comment:
   stores should still be the map from stateId to the actual store. No need to add extra indirect logic here.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -271,16 +263,19 @@ private static ByteArrayOutputStream getThreadLocalBaos() {
   public static class Factory<K> implements StateInternalsFactory<K> {
     private final String stageId;
     private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+    private final Map<String, String> stateIdMap;
     private final Coder<K> keyCoder;
     private final int batchGetSize;
 
     public Factory(
         String stageId,
         Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
+        Map<String, String> stateIdMap,

Review Comment:
   remove



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -303,7 +298,8 @@ public StateInternals stateInternalsForKey(@Nullable K key) {
         throw new RuntimeException("Cannot encode key for state store", e);
       }
 
-      return new SamzaStoreStateInternals<>(stores, key, baos.toByteArray(), stageId, batchGetSize);
+      return new SamzaStoreStateInternals<>(

Review Comment:
   reverse the cahnge.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -149,37 +139,39 @@ static <K> Factory<K> createStateInternalsFactory(
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
       ExecutableStage executableStage) {
-
-    Set<String> stateIds =
+    // TODO: handle same stateIds in multiple ParDos for portable mode
+    Map<String, String> stateIds =
         executableStage.getUserStates().stream()
-            .map(UserStateReference::localName)
-            .collect(Collectors.toSet());
+            .collect(
+                Collectors.toMap(UserStateReference::localName, UserStateReference::localName));
 
     return createStateInternalsFactory(id, keyCoder, context, pipelineOptions, stateIds);
   }
 
   @SuppressWarnings("unchecked")
-  private static <K> Factory<K> createStateInternalsFactory(
+  static <K> Factory<K> createStateInternalsFactory(
       String id,
       @Nullable Coder<K> keyCoder,
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
-      Collection<String> stateIds) {
+      Map<String, String> stateIdMap) {
     final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
     final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores = new HashMap<>();
     stores.put(BEAM_STORE, getBeamStore(context));
-
     final Coder<K> stateKeyCoder;
     if (keyCoder != null) {
-      stateIds.forEach(
-          stateId ->
-              stores.put(
-                  stateId, (KeyValueStore<ByteArray, StateValue<?>>) context.getStore(stateId)));
+      stateIdMap
+          .values()
+          .forEach(
+              storeId ->
+                  stores.put(
+                      storeId,
+                      (KeyValueStore<ByteArray, StateValue<?>>) context.getStore(storeId)));
       stateKeyCoder = keyCoder;
     } else {
       stateKeyCoder = (Coder<K>) VoidCoder.of();
     }
-    return new Factory<>(Objects.toString(id), stores, stateKeyCoder, batchGetSize);
+    return new Factory<>(Objects.toString(id), stores, stateIdMap, stateKeyCoder, batchGetSize);

Review Comment:
   get rid of stateIdMap.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -323,11 +319,11 @@ protected AbstractSamzaState(
       this.coder = coder;
       this.namespace = namespace;
       this.addressId = address.getId();
-      this.isBeamStore = !stores.containsKey(address.getId());
+      this.isBeamStore = !stores.containsKey(stateIdMap.get(address.getId()));

Review Comment:
   reverse the change.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -271,16 +263,19 @@ private static ByteArrayOutputStream getThreadLocalBaos() {
   public static class Factory<K> implements StateInternalsFactory<K> {
     private final String stageId;
     private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+    private final Map<String, String> stateIdMap;
     private final Coder<K> keyCoder;
     private final int batchGetSize;
 
     public Factory(
         String stageId,
         Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
+        Map<String, String> stateIdMap,
         Coder<K> keyCoder,
         int batchGetSize) {
       this.stageId = stageId;
       this.stores = stores;
+      this.stateIdMap = stateIdMap;

Review Comment:
   remove



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java:
##########
@@ -35,12 +35,17 @@ public class ConfigContext {
   private final Map<PValue, String> idMap;
   private AppliedPTransform<?, ?, ?> currentTransform;
   private final SamzaPipelineOptions options;
-  private final Set<String> stateIds;
+  private final Map<String, String> usedStateIdMap;
+  private final Map<String, String> stateIdsToRewrite;
 
-  public ConfigContext(Map<PValue, String> idMap, SamzaPipelineOptions options) {
+  public ConfigContext(

Review Comment:
   Remove all the changes in this class. Do a pre-scan instead.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -323,11 +319,11 @@ protected AbstractSamzaState(
       this.coder = coder;
       this.namespace = namespace;
       this.addressId = address.getId();
-      this.isBeamStore = !stores.containsKey(address.getId());
+      this.isBeamStore = !stores.containsKey(stateIdMap.get(address.getId()));
       this.store =
           isBeamStore
               ? (KeyValueStore) stores.get(BEAM_STORE)
-              : (KeyValueStore) stores.get(address.getId());
+              : (KeyValueStore) stores.get(stateIdMap.get(address.getId()));

Review Comment:
   reverse the change.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java:
##########
@@ -64,8 +69,39 @@ public SamzaPipelineOptions getPipelineOptions() {
     return this.options;
   }
 
-  public boolean addStateId(String stateId) {
-    return stateIds.add(stateId);
+  /** Helper to keep track of used stateIds and return unique store id. */
+  public String getUniqueStoreId(String stateId, String parDoName) {
+    // Update a map of used state id with parDo name.
+    if (!usedStateIdMap.containsKey(stateId)) {
+      usedStateIdMap.put(stateId, parDoName);
+      return stateId;
+    } else {
+      // Same state id identified for the first time
+      if (!stateIdsToRewrite.containsKey(stateId)) {
+        final String prevParDoName = usedStateIdMap.get(stateId);
+        final String prevMultiParDoStateId =
+            StoreIdUtils.toMultiParDoStoreId(stateId, prevParDoName);
+        usedStateIdMap.put(prevMultiParDoStateId, prevParDoName);
+        // Store the stateId with previous parDo name which will be used for config rewriting
+        stateIdsToRewrite.put(stateId, prevParDoName);

Review Comment:
   Please, do it in one-shot. Rewriting just means the code is not good enough. And none of the normal devs will understand what's going on in this 50 lines of code.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java:
##########
@@ -105,6 +108,46 @@ public static void createConfig(
     pipeline.traverseTopologically(visitor);
   }
 
+  /**
+   * Rewrite user store configs if there exists same state ids used in multiple ParDos. For each
+   * entry of a stateId to escaped PTransform name of first occurrence in topological traversal,
+   * rewrite RocksDB configs with the new mapping enforced from stateId to storeId.
+   * (eg) @StateId("foo") used in two ParDos fn, fn2: .apply("First Stateful ParDo with same
+   * stateId", ParDo.of(fn)) .apply("Second Stateful ParDo with same stateId", ParDo.of(fn2)) Map =
+   * ("foo", "First_Stateful_ParDo_with_same_stateId") storeId =
+   * "foo-First_Stateful_ParDo_with_same_stateId"
+   */
+  public static void rewriteConfigWithMultiParDoStateId(

Review Comment:
   get rid of this!



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java:
##########
@@ -83,6 +84,10 @@ public void putAll(Map<String, String> properties) {
     config.putAll(properties);
   }
 
+  public void remove(String name) {

Review Comment:
   get rid of this. The whole config generation should be immutable.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java:
##########
@@ -377,25 +392,21 @@ public Map<String, String> createConfig(
     if (signature.usesState()) {
       // set up user state configs
       for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) {
-        final String storeId = state.id();
-
-        // TODO: remove validation after we support same state id in different ParDo.
-        if (!ctx.addStateId(storeId)) {
-          throw new IllegalStateException(
-              "Duplicate StateId " + storeId + " found in multiple ParDo.");
-        }
-
+        final String userStateId = state.id();
+        final String escapedParDoName =
+            SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName());
+        final String uniqueStoreId = ctx.getUniqueStoreId(userStateId, escapedParDoName);

Review Comment:
   You already have userStateIdToStoreIdMap, why not just go a get(userStateId)? Seems this is pretty redundent.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -83,6 +84,7 @@ public class TranslationContext {
   private final Map<PValue, String> idMap;
   private final Map<String, MessageStream> registeredInputStreams = new HashMap<>();
   private final Map<String, Table> registeredTables = new HashMap<>();
+  private final Set<String> multiParDoStateIds;

Review Comment:
   nonUniqueStateIds? The current name doesn't mean too much.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+public class StoreIdUtils {
+
+  /**
+   * Join stateId and escaped PTransform name, used for RocksDB storeId of stateIds with multiple
+   * ParDos.
+   */
+  public static String toMultiParDoStoreId(String stateId, String escapedPTransformName) {

Review Comment:
   toUniqueStoreId()? Current name is not well understood to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1036734718


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java:
##########
@@ -35,12 +34,17 @@ public class ConfigContext {
   private final Map<PValue, String> idMap;
   private AppliedPTransform<?, ?, ?> currentTransform;
   private final SamzaPipelineOptions options;
-  private final Set<String> stateIds;
+  private final Map<String, String> usedStateIdMap;
+  private final Map<String, String> multiParDoStateIdMap;

Review Comment:
   Sounds good - made the change.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -249,6 +252,19 @@ public String getTransformId() {
     return idGenerator.getId(getTransformFullName());
   }
 
+  /** Given a set of user stateIds and parDo name, return a stateId to storeId map. */
+  public Map<String, String> getStateIdToStoreIdMap(Set<String> stateIds, String escapedParDoName) {
+    final Map<String, String> storeIds = new HashMap<>();
+    stateIds.forEach(
+        stateId ->
+            storeIds.put(
+                stateId,
+                multiParDoStateIds.contains(stateId)
+                    ? String.join("-", stateId, escapedParDoName)

Review Comment:
   Done. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1036733899


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -140,9 +141,21 @@ public SamzaPipelineResult run(Pipeline pipeline) {
     LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph);
 
     final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    /* Map of stateId to sanitized (remove whitespace and replace '-' with '_') PTransform name, used in multiple ParDos

Review Comment:
   Moved the javadoc with /**. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] dxichen commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
dxichen commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1037425664


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java:
##########
@@ -105,6 +106,38 @@ public static void createConfig(
     pipeline.traverseTopologically(visitor);
   }
 
+  /** Rewrite user store configs if there exists same state ids with multiple ParDos. */
+  public static void rewriteConfigWithMultiParDoStateId(
+      SamzaPipelineOptions options,
+      Map<String, String> multiParDoStateIdMap,
+      ConfigBuilder configBuilder) {
+    multiParDoStateIdMap.forEach(
+        (stateId, value) -> {
+          // rewrite single parDo state configs into multiple parDo state
+          String multiParDoStateId = String.join("-", stateId, value);
+          // replace old single parDo store configs with new storeId mapping appended with parDo
+          // name
+          configBuilder.remove("stores." + stateId + ".factory");
+          configBuilder.remove("stores." + stateId + ".key.serde");
+          configBuilder.remove("stores." + stateId + ".msg.serde");
+          configBuilder.remove("stores." + stateId + ".rocksdb.compression");
+          // put new config with multi pardo config
+          configBuilder.put(
+              "stores." + multiParDoStateId + ".factory",
+              "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");

Review Comment:
   Oh I just mean something like `RocksDbKeyValueStorageEngineFactory.class.getName()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1037952612


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java:
##########
@@ -377,25 +392,21 @@ public Map<String, String> createConfig(
     if (signature.usesState()) {
       // set up user state configs
       for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) {
-        final String storeId = state.id();
-
-        // TODO: remove validation after we support same state id in different ParDo.
-        if (!ctx.addStateId(storeId)) {
-          throw new IllegalStateException(
-              "Duplicate StateId " + storeId + " found in multiple ParDo.");
-        }
-
+        final String userStateId = state.id();
+        final String escapedParDoName =
+            SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName());
+        final String uniqueStoreId = ctx.getUniqueStoreId(userStateId, escapedParDoName);

Review Comment:
   Only need config creation part for translation so removed this part. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1038553994


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java:
##########
@@ -148,7 +149,8 @@ public DoFnOp(
       JobInfo jobInfo,
       Map<String, TupleTag<?>> idToTupleTagMap,
       DoFnSchemaInformation doFnSchemaInformation,
-      Map<?, PCollectionView<?>> sideInputMapping) {
+      Map<?, PCollectionView<?>> sideInputMapping,
+      Map<String, String> userStateIds) {

Review Comment:
   Made the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool merged pull request #24276: Samza runner support for non unique stateId across multiple ParDos

Posted by GitBox <gi...@apache.org>.
xinyuiscool merged PR #24276:
URL: https://github.com/apache/beam/pull/24276


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] dxichen commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
dxichen commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1037427344


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -129,18 +130,7 @@ static KeyValueStore<ByteArray, StateValue<?>> getBeamStore(TaskContext context)
    */
   static <K> Factory<K> createNonKeyedStateInternalsFactory(
       String id, TaskContext context, SamzaPipelineOptions pipelineOptions) {
-    return createStateInternalsFactory(id, null, context, pipelineOptions, Collections.emptySet());
-  }
-
-  static <K> Factory<K> createStateInternalsFactory(

Review Comment:
   got it, lets leave this out if it is no longer used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1036734527


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java:
##########
@@ -105,6 +106,38 @@ public static void createConfig(
     pipeline.traverseTopologically(visitor);
   }
 
+  /** Rewrite user store configs if there exists same state ids with multiple ParDos. */
+  public static void rewriteConfigWithMultiParDoStateId(
+      SamzaPipelineOptions options,
+      Map<String, String> multiParDoStateIdMap,
+      ConfigBuilder configBuilder) {
+    multiParDoStateIdMap.forEach(
+        (stateId, value) -> {
+          // rewrite single parDo state configs into multiple parDo state
+          String multiParDoStateId = String.join("-", stateId, value);
+          // replace old single parDo store configs with new storeId mapping appended with parDo
+          // name
+          configBuilder.remove("stores." + stateId + ".factory");
+          configBuilder.remove("stores." + stateId + ".key.serde");
+          configBuilder.remove("stores." + stateId + ".msg.serde");
+          configBuilder.remove("stores." + stateId + ".rocksdb.compression");
+          // put new config with multi pardo config
+          configBuilder.put(
+              "stores." + multiParDoStateId + ".factory",
+              "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory");

Review Comment:
   Hi Daniel, could you elaborate more on what you mean by "already including the package?" Thanks! We do have some occurences of using these classnames in other places where we add the configs. 



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -149,37 +139,39 @@ static <K> Factory<K> createStateInternalsFactory(
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
       ExecutableStage executableStage) {
-
-    Set<String> stateIds =
+    // TODO: handle same stateIds in multiple ParDos for portable mode
+    Map<String, String> stateIds =
         executableStage.getUserStates().stream()
             .map(UserStateReference::localName)
-            .collect(Collectors.toSet());
+            .collect(Collectors.toMap(Function.identity(), Function.identity()));

Review Comment:
   Thanks, made the change!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on PR #24276:
URL: https://github.com/apache/beam/pull/24276#issuecomment-1334976976

   > Two main issues:
   > 
   > 1. no rewriting of configs, instead, do a prescan of pipeline and build the non-unique state id set
   > 2. the changes in the SamzaStateInternals are not needed if you make the stores map still follow the same definition of {stateId -> store}. The state internals is generated per PTransform so it will not have dups here.
   
   Thank you so much for taking the time reviewing, providing suggestions on the pre-scan/ better naming, and fixing my understanding in SamzaStoreInternals. I have addressed all the comments and please have another look. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on pull request #24276: Samza runner support for non unique stateId across multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on PR #24276:
URL: https://github.com/apache/beam/pull/24276#issuecomment-1338355616

   Thank you @xinyuiscool for the review again. SamzaStoreStateInternals logic has been included and verified the changes with all the tests passing from ./gradlew :runners:samza:build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on pull request #24276: Samza runner support for non unique stateId across multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on PR #24276:
URL: https://github.com/apache/beam/pull/24276#issuecomment-1340141914

   > Overall looks great! Have some minor comments about consolidating some duplicate logic into a single class.
   
   @xinyuiscool Awesome, I've addressed all the comments above. Please review upon your availability! Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1038557202


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java:
##########
@@ -377,25 +392,21 @@ public Map<String, String> createConfig(
     if (signature.usesState()) {
       // set up user state configs
       for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) {
-        final String storeId = state.id();
-
-        // TODO: remove validation after we support same state id in different ParDo.
-        if (!ctx.addStateId(storeId)) {
-          throw new IllegalStateException(
-              "Duplicate StateId " + storeId + " found in multiple ParDo.");
-        }
-
+        final String userStateId = state.id();
+        final String escapedParDoName =
+            SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName());
+        final String uniqueStoreId = ctx.getUniqueStoreId(userStateId, escapedParDoName);

Review Comment:
   Now that we removed the rewriting config logic, we no longer have "userStateIdToStoreIdMap". The change is that we have the nonUniqueStateId set from StateIdParser.scan(). Then, given this set, we'll create the stateIdToStoreMapping for each ParDo and initialize the SamzaStateStoreInternals, while keeping the stateId-> store map in "stores". Please take a look and let me know if this looks reasonable. Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool commented on a diff in pull request #24276: Samza runner support for non unique stateId across multiple ParDos

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1040212756


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java:
##########
@@ -377,17 +392,15 @@ public Map<String, String> createConfig(
     if (signature.usesState()) {
       // set up user state configs
       for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) {
-        final String storeId = state.id();
-
-        // TODO: remove validation after we support same state id in different ParDo.
-        if (!ctx.addStateId(storeId)) {
-          throw new IllegalStateException(
-              "Duplicate StateId " + storeId + " found in multiple ParDo.");
+        final String stateId = state.id();
+        String storeId = stateId;

Review Comment:
   Same above, add a StoreIdGenerator inside ConfigContext and use that here.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java:
##########
@@ -35,12 +34,13 @@ public class ConfigContext {
   private final Map<PValue, String> idMap;
   private AppliedPTransform<?, ?, ?> currentTransform;
   private final SamzaPipelineOptions options;
-  private final Set<String> stateIds;
+  private final Set<String> nonUniqueStateIds;

Review Comment:
   Please take a look at the comments below to wrap this in a StoreIdGenerator class.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -83,6 +83,7 @@ public class TranslationContext {
   private final Map<PValue, String> idMap;
   private final Map<String, MessageStream> registeredInputStreams = new HashMap<>();
   private final Map<String, Table> registeredTables = new HashMap<>();
+  private final Set<String> nonUniqueStateIds;

Review Comment:
   See comments above to wrap this in a StoreIdGenerator class.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java:
##########
@@ -161,6 +163,17 @@ private static <InT, OutT> void doTranslate(
     Map<String, PCollectionView<?>> sideInputMapping =
         ParDoTranslation.getSideInputMapping(ctx.getCurrentTransform());
 
+    final DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+    final Map<String, String> stateIdToStoreMapping = new HashMap<>();
+    for (String stateId : signature.stateDeclarations().keySet()) {
+      String storeId = stateId;

Review Comment:
   Seems line 169 - 173 is duplicated below in the config part too. Could you consolidate the logic and encapsulate it in a single class, e.g. a StoreIdGenerator class which takes in the nonUniqueStateId set as the constructor, and expose a function of getId(stateId, transformFullName). So in both ConfigContext and TranslationContext, you can expose this helper class instance, e.g.:
   
   final String storeId = ctx.getStoreIdGenerator().getId(stateId, transformFullName);
   



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+public class StoreIdUtils {

Review Comment:
   We can move this method in this class into the new StoreIdGenerator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1036736271


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -129,18 +130,7 @@ static KeyValueStore<ByteArray, StateValue<?>> getBeamStore(TaskContext context)
    */
   static <K> Factory<K> createNonKeyedStateInternalsFactory(
       String id, TaskContext context, SamzaPipelineOptions pipelineOptions) {
-    return createStateInternalsFactory(id, null, context, pipelineOptions, Collections.emptySet());
-  }
-
-  static <K> Factory<K> createStateInternalsFactory(

Review Comment:
   As we fetch stateIds from DoFnSignature earlier in the logic for handling stateIds with multiParDo, I've replaced this with using stateIdMap (map of stateId to RocksDB storeId). Would we prefer to add this back for future use?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
xinyuiscool commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1037674483


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -140,9 +141,14 @@ public SamzaPipelineResult run(Pipeline pipeline) {
     LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph);
 
     final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    final Map<String, String> multiParDoStateIdMap = new HashMap<>();

Review Comment:
   The current impl of invoking the createConfig() to populate this map and then have another rewrite() to remove some configs and add new configs is quite awkward. The more elegant impl should be following the existing practice of building the viewToId map, do a pre-scan of the pipeline to create the immutable nonUniqueStateIds set. I expect the code looks like the following:
   
   // DO a pre-scan of the pipeline to build this map, and it stays immutable afterwards.
   final Set<String> nonUniqueStateIds = StateIdParser.scan(pipeline);
   // Create the store configs in one shot, no rewriting businsess
   SamzaPipelineTranslator.createConfig(
           pipeline, options, idMap, nonUniqueStateIds, configBuilder);
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] seungjin-an commented on a diff in pull request #24276: Samza runner support for same stateId with multiple ParDos

Posted by GitBox <gi...@apache.org>.
seungjin-an commented on code in PR #24276:
URL: https://github.com/apache/beam/pull/24276#discussion_r1037946626


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java:
##########
@@ -148,7 +149,8 @@ public DoFnOp(
       JobInfo jobInfo,
       Map<String, TupleTag<?>> idToTupleTagMap,
       DoFnSchemaInformation doFnSchemaInformation,
-      Map<?, PCollectionView<?>> sideInputMapping) {
+      Map<?, PCollectionView<?>> sideInputMapping,
+      Map<String, String> userStateIds) {

Review Comment:
   As this is no longer needed, I reverted this diff but definitely agree. Thank you



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java:
##########
@@ -140,9 +141,14 @@ public SamzaPipelineResult run(Pipeline pipeline) {
     LOG.info("Beam pipeline JSON graph:\n{}", jsonGraph);
 
     final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
+    final Map<String, String> multiParDoStateIdMap = new HashMap<>();

Review Comment:
   I totally agree - this approach seems much better. Made the change, thank you!



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -99,13 +96,16 @@
 
   // the stores include both beamStore for system states as well as stores for user state
   private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+  // Map of non-system stateIds to unique storeId
+  private final Map<String, String> stateIdMap;
   private final K key;
   private final byte[] keyBytes;
   private final int batchGetSize;
   private final String stageId;
 
   private SamzaStoreStateInternals(
       Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,

Review Comment:
   Ack removed the changes in this class



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -149,37 +139,39 @@ static <K> Factory<K> createStateInternalsFactory(
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
       ExecutableStage executableStage) {
-
-    Set<String> stateIds =
+    // TODO: handle same stateIds in multiple ParDos for portable mode
+    Map<String, String> stateIds =
         executableStage.getUserStates().stream()
-            .map(UserStateReference::localName)
-            .collect(Collectors.toSet());
+            .collect(
+                Collectors.toMap(UserStateReference::localName, UserStateReference::localName));
 
     return createStateInternalsFactory(id, keyCoder, context, pipelineOptions, stateIds);
   }
 
   @SuppressWarnings("unchecked")
-  private static <K> Factory<K> createStateInternalsFactory(
+  static <K> Factory<K> createStateInternalsFactory(
       String id,
       @Nullable Coder<K> keyCoder,
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
-      Collection<String> stateIds) {
+      Map<String, String> stateIdMap) {
     final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
     final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores = new HashMap<>();
     stores.put(BEAM_STORE, getBeamStore(context));
-
     final Coder<K> stateKeyCoder;
     if (keyCoder != null) {
-      stateIds.forEach(
-          stateId ->
-              stores.put(
-                  stateId, (KeyValueStore<ByteArray, StateValue<?>>) context.getStore(stateId)));
+      stateIdMap
+          .values()
+          .forEach(
+              storeId ->
+                  stores.put(

Review Comment:
   Ack removed the changes in this class



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -271,16 +263,19 @@ private static ByteArrayOutputStream getThreadLocalBaos() {
   public static class Factory<K> implements StateInternalsFactory<K> {
     private final String stageId;
     private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+    private final Map<String, String> stateIdMap;
     private final Coder<K> keyCoder;
     private final int batchGetSize;
 
     public Factory(
         String stageId,
         Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
+        Map<String, String> stateIdMap,
         Coder<K> keyCoder,
         int batchGetSize) {
       this.stageId = stageId;
       this.stores = stores;
+      this.stateIdMap = stateIdMap;

Review Comment:
   Done.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -323,11 +319,11 @@ protected AbstractSamzaState(
       this.coder = coder;
       this.namespace = namespace;
       this.addressId = address.getId();
-      this.isBeamStore = !stores.containsKey(address.getId());
+      this.isBeamStore = !stores.containsKey(stateIdMap.get(address.getId()));

Review Comment:
   Done.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java:
##########
@@ -35,12 +35,17 @@ public class ConfigContext {
   private final Map<PValue, String> idMap;
   private AppliedPTransform<?, ?, ?> currentTransform;
   private final SamzaPipelineOptions options;
-  private final Set<String> stateIds;
+  private final Map<String, String> usedStateIdMap;
+  private final Map<String, String> stateIdsToRewrite;
 
-  public ConfigContext(Map<PValue, String> idMap, SamzaPipelineOptions options) {
+  public ConfigContext(

Review Comment:
   Thank you for the suggestion here, made the change.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java:
##########
@@ -377,25 +392,21 @@ public Map<String, String> createConfig(
     if (signature.usesState()) {
       // set up user state configs
       for (DoFnSignature.StateDeclaration state : signature.stateDeclarations().values()) {
-        final String storeId = state.id();
-
-        // TODO: remove validation after we support same state id in different ParDo.
-        if (!ctx.addStateId(storeId)) {
-          throw new IllegalStateException(
-              "Duplicate StateId " + storeId + " found in multiple ParDo.");
-        }
-
+        final String userStateId = state.id();
+        final String escapedParDoName =
+            SamzaPipelineTranslatorUtils.escape(node.getEnclosingNode().getFullName());
+        final String uniqueStoreId = ctx.getUniqueStoreId(userStateId, escapedParDoName);

Review Comment:
   Only need config creation part for translation so removed this part. 



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/util/StoreIdUtils.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+public class StoreIdUtils {
+
+  /**
+   * Join stateId and escaped PTransform name, used for RocksDB storeId of stateIds with multiple
+   * ParDos.
+   */
+  public static String toMultiParDoStoreId(String stateId, String escapedPTransformName) {

Review Comment:
   Done.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java:
##########
@@ -92,14 +90,14 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> create(
       DoFnSchemaInformation doFnSchemaInformation,
       Map<String, PCollectionView<?>> sideInputMapping,
       OpEmitter emitter,
-      FutureCollector futureCollector) {
+      FutureCollector futureCollector,
+      Map<String, String> userStateIds) {

Review Comment:
   Sounds good



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -99,13 +96,16 @@
 
   // the stores include both beamStore for system states as well as stores for user state
   private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+  // Map of non-system stateIds to unique storeId
+  private final Map<String, String> stateIdMap;

Review Comment:
   Sounds good.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -271,16 +263,19 @@ private static ByteArrayOutputStream getThreadLocalBaos() {
   public static class Factory<K> implements StateInternalsFactory<K> {
     private final String stageId;
     private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+    private final Map<String, String> stateIdMap;

Review Comment:
   Done.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -303,7 +298,8 @@ public StateInternals stateInternalsForKey(@Nullable K key) {
         throw new RuntimeException("Cannot encode key for state store", e);
       }
 
-      return new SamzaStoreStateInternals<>(stores, key, baos.toByteArray(), stageId, batchGetSize);
+      return new SamzaStoreStateInternals<>(

Review Comment:
   Done.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -149,37 +139,39 @@ static <K> Factory<K> createStateInternalsFactory(
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
       ExecutableStage executableStage) {
-
-    Set<String> stateIds =
+    // TODO: handle same stateIds in multiple ParDos for portable mode
+    Map<String, String> stateIds =
         executableStage.getUserStates().stream()
-            .map(UserStateReference::localName)
-            .collect(Collectors.toSet());
+            .collect(
+                Collectors.toMap(UserStateReference::localName, UserStateReference::localName));
 
     return createStateInternalsFactory(id, keyCoder, context, pipelineOptions, stateIds);
   }
 
   @SuppressWarnings("unchecked")
-  private static <K> Factory<K> createStateInternalsFactory(
+  static <K> Factory<K> createStateInternalsFactory(
       String id,
       @Nullable Coder<K> keyCoder,
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
-      Collection<String> stateIds) {
+      Map<String, String> stateIdMap) {
     final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
     final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores = new HashMap<>();
     stores.put(BEAM_STORE, getBeamStore(context));
-
     final Coder<K> stateKeyCoder;
     if (keyCoder != null) {
-      stateIds.forEach(
-          stateId ->
-              stores.put(
-                  stateId, (KeyValueStore<ByteArray, StateValue<?>>) context.getStore(stateId)));
+      stateIdMap

Review Comment:
   Got it - thank you.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -149,37 +139,39 @@ static <K> Factory<K> createStateInternalsFactory(
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
       ExecutableStage executableStage) {
-
-    Set<String> stateIds =
+    // TODO: handle same stateIds in multiple ParDos for portable mode
+    Map<String, String> stateIds =
         executableStage.getUserStates().stream()
-            .map(UserStateReference::localName)
-            .collect(Collectors.toSet());
+            .collect(
+                Collectors.toMap(UserStateReference::localName, UserStateReference::localName));
 
     return createStateInternalsFactory(id, keyCoder, context, pipelineOptions, stateIds);
   }
 
   @SuppressWarnings("unchecked")
-  private static <K> Factory<K> createStateInternalsFactory(
+  static <K> Factory<K> createStateInternalsFactory(
       String id,
       @Nullable Coder<K> keyCoder,
       TaskContext context,
       SamzaPipelineOptions pipelineOptions,
-      Collection<String> stateIds) {
+      Map<String, String> stateIdMap) {
     final int batchGetSize = pipelineOptions.getStoreBatchGetSize();
     final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores = new HashMap<>();
     stores.put(BEAM_STORE, getBeamStore(context));
-
     final Coder<K> stateKeyCoder;
     if (keyCoder != null) {
-      stateIds.forEach(
-          stateId ->
-              stores.put(
-                  stateId, (KeyValueStore<ByteArray, StateValue<?>>) context.getStore(stateId)));
+      stateIdMap
+          .values()
+          .forEach(
+              storeId ->
+                  stores.put(
+                      storeId,
+                      (KeyValueStore<ByteArray, StateValue<?>>) context.getStore(storeId)));
       stateKeyCoder = keyCoder;
     } else {
       stateKeyCoder = (Coder<K>) VoidCoder.of();
     }
-    return new Factory<>(Objects.toString(id), stores, stateKeyCoder, batchGetSize);
+    return new Factory<>(Objects.toString(id), stores, stateIdMap, stateKeyCoder, batchGetSize);

Review Comment:
   Done.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -99,13 +96,16 @@
 
   // the stores include both beamStore for system states as well as stores for user state
   private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+  // Map of non-system stateIds to unique storeId
+  private final Map<String, String> stateIdMap;
   private final K key;
   private final byte[] keyBytes;
   private final int batchGetSize;
   private final String stageId;
 
   private SamzaStoreStateInternals(
       Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
+      Map<String, String> stateIds,

Review Comment:
   Ack removed the changes in this class



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -271,16 +263,19 @@ private static ByteArrayOutputStream getThreadLocalBaos() {
   public static class Factory<K> implements StateInternalsFactory<K> {
     private final String stageId;
     private final Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores;
+    private final Map<String, String> stateIdMap;
     private final Coder<K> keyCoder;
     private final int batchGetSize;
 
     public Factory(
         String stageId,
         Map<String, KeyValueStore<ByteArray, StateValue<?>>> stores,
+        Map<String, String> stateIdMap,

Review Comment:
   Done.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java:
##########
@@ -83,6 +84,10 @@ public void putAll(Map<String, String> properties) {
     config.putAll(properties);
   }
 
+  public void remove(String name) {

Review Comment:
   Makes sense - done.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java:
##########
@@ -323,11 +319,11 @@ protected AbstractSamzaState(
       this.coder = coder;
       this.namespace = namespace;
       this.addressId = address.getId();
-      this.isBeamStore = !stores.containsKey(address.getId());
+      this.isBeamStore = !stores.containsKey(stateIdMap.get(address.getId()));
       this.store =
           isBeamStore
               ? (KeyValueStore) stores.get(BEAM_STORE)
-              : (KeyValueStore) stores.get(address.getId());
+              : (KeyValueStore) stores.get(stateIdMap.get(address.getId()));

Review Comment:
   Done.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java:
##########
@@ -64,8 +69,39 @@ public SamzaPipelineOptions getPipelineOptions() {
     return this.options;
   }
 
-  public boolean addStateId(String stateId) {
-    return stateIds.add(stateId);
+  /** Helper to keep track of used stateIds and return unique store id. */
+  public String getUniqueStoreId(String stateId, String parDoName) {
+    // Update a map of used state id with parDo name.
+    if (!usedStateIdMap.containsKey(stateId)) {
+      usedStateIdMap.put(stateId, parDoName);
+      return stateId;
+    } else {
+      // Same state id identified for the first time
+      if (!stateIdsToRewrite.containsKey(stateId)) {
+        final String prevParDoName = usedStateIdMap.get(stateId);
+        final String prevMultiParDoStateId =
+            StoreIdUtils.toMultiParDoStoreId(stateId, prevParDoName);
+        usedStateIdMap.put(prevMultiParDoStateId, prevParDoName);
+        // Store the stateId with previous parDo name which will be used for config rewriting
+        stateIdsToRewrite.put(stateId, prevParDoName);

Review Comment:
   Changed the logic to prescanning.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java:
##########
@@ -105,6 +108,46 @@ public static void createConfig(
     pipeline.traverseTopologically(visitor);
   }
 
+  /**
+   * Rewrite user store configs if there exists same state ids used in multiple ParDos. For each
+   * entry of a stateId to escaped PTransform name of first occurrence in topological traversal,
+   * rewrite RocksDB configs with the new mapping enforced from stateId to storeId.
+   * (eg) @StateId("foo") used in two ParDos fn, fn2: .apply("First Stateful ParDo with same
+   * stateId", ParDo.of(fn)) .apply("Second Stateful ParDo with same stateId", ParDo.of(fn2)) Map =
+   * ("foo", "First_Stateful_ParDo_with_same_stateId") storeId =
+   * "foo-First_Stateful_ParDo_with_same_stateId"
+   */
+  public static void rewriteConfigWithMultiParDoStateId(

Review Comment:
   Thank you :-) 



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java:
##########
@@ -83,6 +84,7 @@ public class TranslationContext {
   private final Map<PValue, String> idMap;
   private final Map<String, MessageStream> registeredInputStreams = new HashMap<>();
   private final Map<String, Table> registeredTables = new HashMap<>();
+  private final Set<String> multiParDoStateIds;

Review Comment:
   I really like "nonUniqueStateIds".. made the change throughout the diff. Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org