You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2018/12/05 07:27:42 UTC

[beam] branch master updated: Merge pull request #7204: [BEAM-5884] Fix FieldType comparison in BeamSQL

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 681b5cd  Merge pull request #7204:  [BEAM-5884] Fix FieldType comparison in BeamSQL
681b5cd is described below

commit 681b5cd8d749eb8f889ef46984fcf58c1c3c392a
Author: reuvenlax <re...@google.com>
AuthorDate: Tue Dec 4 23:27:36 2018 -0800

    Merge pull request #7204:  [BEAM-5884] Fix FieldType comparison in BeamSQL
---
 .../core/src/main/java/org/apache/beam/sdk/schemas/Schema.java     | 6 ++++--
 .../sql/meta/provider/pubsub/PubsubJsonTableProvider.java          | 7 +++++--
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index a923c0e..d256b32 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -228,7 +228,8 @@ public class Schema implements Serializable {
     return true;
   }
 
-  enum EquivalenceNullablePolicy {
+  /** Control whether nullable is included in equivalence check. */
+  public enum EquivalenceNullablePolicy {
     SAME,
     WEAKEN,
     IGNORE
@@ -581,7 +582,8 @@ public class Schema implements Serializable {
       return true;
     }
 
-    private boolean equivalent(FieldType other, EquivalenceNullablePolicy nullablePolicy) {
+    /** Check whether two types are equivalent. */
+    public boolean equivalent(FieldType other, EquivalenceNullablePolicy nullablePolicy) {
       if (nullablePolicy == EquivalenceNullablePolicy.SAME
           && !other.getNullable().equals(getNullable())) {
         return false;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
index 66a952a..9c13c8a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
@@ -71,7 +71,8 @@ public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
 
     if (schema.getFieldCount() != 3
         || !fieldPresent(schema, TIMESTAMP_FIELD, TIMESTAMP)
-        || !fieldPresent(schema, ATTRIBUTES_FIELD, Schema.FieldType.map(VARCHAR, VARCHAR))
+        || !fieldPresent(
+            schema, ATTRIBUTES_FIELD, Schema.FieldType.map(VARCHAR.withNullable(false), VARCHAR))
         || !(schema.hasField(PAYLOAD_FIELD)
             && ROW.equals(schema.getField(PAYLOAD_FIELD).getType().getTypeName()))) {
 
@@ -85,7 +86,9 @@ public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
   }
 
   private boolean fieldPresent(Schema schema, String field, Schema.FieldType expectedType) {
-    return schema.hasField(field) && expectedType.equals(schema.getField(field).getType());
+    return schema.hasField(field)
+        && expectedType.equivalent(
+            schema.getField(field).getType(), Schema.EquivalenceNullablePolicy.IGNORE);
   }
 
   private void validateDlq(String deadLetterQueue) {