You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2020/11/02 19:06:42 UTC

[beam] branch master updated: [BEAM-11154] Check coder proto to avoid registering same coder under different name in step translation phase (#13225)

This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e6b571e  [BEAM-11154] Check coder proto to avoid registering same coder under different name in step translation phase (#13225)
e6b571e is described below

commit e6b571e6bc04fb6ecc6e75f5064e09337e6341e2
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Mon Nov 2 11:06:08 2020 -0800

    [BEAM-11154] Check coder proto to avoid registering same coder under different name in step translation phase (#13225)
---
 .../apache/beam/runners/core/construction/SdkComponents.java  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 123a95e..fe4bc0b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -54,6 +54,7 @@ public class SdkComponents {
   private final BiMap<WindowingStrategy<?, ?>, String> windowingStrategyIds = HashBiMap.create();
   private final BiMap<Coder<?>, String> coderIds = HashBiMap.create();
   private final BiMap<Environment, String> environmentIds = HashBiMap.create();
+  private final BiMap<RunnerApi.Coder, String> coderProtoToId = HashBiMap.create();
   private final Set<String> requirements;
 
   private final Set<String> reservedIds = new HashSet<>();
@@ -127,6 +128,7 @@ public class SdkComponents {
     reservedIds.addAll(components.getEnvironmentsMap().keySet());
 
     components.getEnvironmentsMap().forEach(environmentIds.inverse()::forcePut);
+    components.getCodersMap().forEach(coderProtoToId.inverse()::forcePut);
 
     if (requirements != null) {
       this.requirements.addAll(requirements);
@@ -264,10 +266,17 @@ public class SdkComponents {
     if (existing != null) {
       return existing;
     }
+    // Unlike StructuredCoder, custom coders may not have proper implementation of hashCode() and
+    // equals(), this lead to unnecessary duplications. In order to avoid this we examine already
+    // registered coders and see if we can find a matching proto, and consider them same coder.
+    RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, this);
+    if (coderProtoToId.containsKey(coderProto)) {
+      return coderProtoToId.get(coderProto);
+    }
     String baseName = NameUtils.approximateSimpleName(coder);
     String name = uniqify(baseName, coderIds.values());
     coderIds.put(coder, name);
-    RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, this);
+    coderProtoToId.put(coderProto, name);
     componentsBuilder.putCoders(name, coderProto);
     return name;
   }