You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2018/12/05 07:27:42 UTC
[beam] branch master updated: Merge pull request #7204: [BEAM-5884]
Fix FieldType comparison in BeamSQL
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 681b5cd Merge pull request #7204: [BEAM-5884] Fix FieldType comparison in BeamSQL
681b5cd is described below
commit 681b5cd8d749eb8f889ef46984fcf58c1c3c392a
Author: reuvenlax <re...@google.com>
AuthorDate: Tue Dec 4 23:27:36 2018 -0800
Merge pull request #7204: [BEAM-5884] Fix FieldType comparison in BeamSQL
---
.../core/src/main/java/org/apache/beam/sdk/schemas/Schema.java | 6 ++++--
.../sql/meta/provider/pubsub/PubsubJsonTableProvider.java | 7 +++++--
2 files changed, 9 insertions(+), 4 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index a923c0e..d256b32 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -228,7 +228,8 @@ public class Schema implements Serializable {
return true;
}
- enum EquivalenceNullablePolicy {
+ /** Control whether nullable is included in equivalence check. */
+ public enum EquivalenceNullablePolicy {
SAME,
WEAKEN,
IGNORE
@@ -581,7 +582,8 @@ public class Schema implements Serializable {
return true;
}
- private boolean equivalent(FieldType other, EquivalenceNullablePolicy nullablePolicy) {
+ /** Check whether two types are equivalent. */
+ public boolean equivalent(FieldType other, EquivalenceNullablePolicy nullablePolicy) {
if (nullablePolicy == EquivalenceNullablePolicy.SAME
&& !other.getNullable().equals(getNullable())) {
return false;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
index 66a952a..9c13c8a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
@@ -71,7 +71,8 @@ public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
if (schema.getFieldCount() != 3
|| !fieldPresent(schema, TIMESTAMP_FIELD, TIMESTAMP)
- || !fieldPresent(schema, ATTRIBUTES_FIELD, Schema.FieldType.map(VARCHAR, VARCHAR))
+ || !fieldPresent(
+ schema, ATTRIBUTES_FIELD, Schema.FieldType.map(VARCHAR.withNullable(false), VARCHAR))
|| !(schema.hasField(PAYLOAD_FIELD)
&& ROW.equals(schema.getField(PAYLOAD_FIELD).getType().getTypeName()))) {
@@ -85,7 +86,9 @@ public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
}
private boolean fieldPresent(Schema schema, String field, Schema.FieldType expectedType) {
- return schema.hasField(field) && expectedType.equals(schema.getField(field).getType());
+ return schema.hasField(field)
+ && expectedType.equivalent(
+ schema.getField(field).getType(), Schema.EquivalenceNullablePolicy.IGNORE);
}
private void validateDlq(String deadLetterQueue) {