You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2020/11/02 19:06:42 UTC
[beam] branch master updated: [BEAM-11154] Check coder proto to
avoid registering same coder under different name in step translation phase
(#13225)
This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e6b571e [BEAM-11154] Check coder proto to avoid registering same coder under different name in step translation phase (#13225)
e6b571e is described below
commit e6b571e6bc04fb6ecc6e75f5064e09337e6341e2
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Mon Nov 2 11:06:08 2020 -0800
[BEAM-11154] Check coder proto to avoid registering same coder under different name in step translation phase (#13225)
---
.../apache/beam/runners/core/construction/SdkComponents.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 123a95e..fe4bc0b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -54,6 +54,7 @@ public class SdkComponents {
private final BiMap<WindowingStrategy<?, ?>, String> windowingStrategyIds = HashBiMap.create();
private final BiMap<Coder<?>, String> coderIds = HashBiMap.create();
private final BiMap<Environment, String> environmentIds = HashBiMap.create();
+ private final BiMap<RunnerApi.Coder, String> coderProtoToId = HashBiMap.create();
private final Set<String> requirements;
private final Set<String> reservedIds = new HashSet<>();
@@ -127,6 +128,7 @@ public class SdkComponents {
reservedIds.addAll(components.getEnvironmentsMap().keySet());
components.getEnvironmentsMap().forEach(environmentIds.inverse()::forcePut);
+ components.getCodersMap().forEach(coderProtoToId.inverse()::forcePut);
if (requirements != null) {
this.requirements.addAll(requirements);
@@ -264,10 +266,17 @@ public class SdkComponents {
if (existing != null) {
return existing;
}
+ // Unlike StructuredCoder, custom coders may not have proper implementation of hashCode() and
+ // equals(), this lead to unnecessary duplications. In order to avoid this we examine already
+ // registered coders and see if we can find a matching proto, and consider them same coder.
+ RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, this);
+ if (coderProtoToId.containsKey(coderProto)) {
+ return coderProtoToId.get(coderProto);
+ }
String baseName = NameUtils.approximateSimpleName(coder);
String name = uniqify(baseName, coderIds.values());
coderIds.put(coder, name);
- RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, this);
+ coderProtoToId.put(coderProto, name);
componentsBuilder.putCoders(name, coderProto);
return name;
}