You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2021/04/29 04:14:44 UTC

[beam] branch master updated: Merge pull request #14639: [BEAM-10277][BEAM-12198] Populate encoding positions to all matching schemas

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 50fcf55  Merge pull request #14639: [BEAM-10277][BEAM-12198] Populate encoding positions to all matching schemas
50fcf55 is described below

commit 50fcf55d60e8fa7a8399d63e030c930a2d45402a
Author: reuvenlax <re...@google.com>
AuthorDate: Wed Apr 28 21:13:57 2021 -0700

    Merge pull request #14639: [BEAM-10277][BEAM-12198] Populate encoding positions to all matching schemas
---
 model/pipeline/src/main/proto/schema.proto         |  4 ++
 .../util/RowCoderCloudObjectTranslator.java        |  6 +++
 .../util/SchemaCoderCloudObjectTranslator.java     |  6 +++
 .../java/org/apache/beam/sdk/coders/RowCoder.java  |  7 +++
 .../apache/beam/sdk/coders/RowCoderGenerator.java  |  9 +++-
 .../java/org/apache/beam/sdk/schemas/Schema.java   |  8 ++++
 .../org/apache/beam/sdk/schemas/SchemaCoder.java   |  6 +++
 .../apache/beam/sdk/schemas/SchemaTranslation.java |  2 +-
 .../main/java/org/apache/beam/sdk/values/Row.java  | 56 +++++++++++++++++++++-
 .../impl/transform/BeamSqlOutputToConsoleFn.java   |  2 +-
 .../apache/beam/sdk/extensions/sql/TestUtils.java  |  6 +--
 11 files changed, 104 insertions(+), 8 deletions(-)

diff --git a/model/pipeline/src/main/proto/schema.proto b/model/pipeline/src/main/proto/schema.proto
index a40087c..837689f 100644
--- a/model/pipeline/src/main/proto/schema.proto
+++ b/model/pipeline/src/main/proto/schema.proto
@@ -37,6 +37,8 @@ message Schema {
   // REQUIRED. An RFC 4122 UUID.
   string id = 2;
   repeated Option options = 3;
+  // Indicates that encoding positions have been overridden.
+  bool encoding_positions_set = 4;
 }
 
 message Field {
@@ -52,6 +54,8 @@ message Field {
    // or all of them are. Used to support backwards compatibility with schema
    // changes.
    // If no fields have encoding position populated the order of encoding is the same as the order in the Schema.
+  // If this Field is part of a Schema where encoding_positions_set is True then encoding_position must be
+  // defined, otherwise this field is ignored.
   int32 encoding_position = 5;
   repeated Option options = 6;
 }
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RowCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RowCoderCloudObjectTranslator.java
index 6de8321..e84d89b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RowCoderCloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RowCoderCloudObjectTranslator.java
@@ -18,6 +18,8 @@
 package org.apache.beam.runners.dataflow.util;
 
 import java.io.IOException;
+import java.util.UUID;
+import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.SchemaApi;
 import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -59,6 +61,10 @@ public class RowCoderCloudObjectTranslator implements CloudObjectTranslator<RowC
       SchemaApi.Schema.Builder schemaBuilder = SchemaApi.Schema.newBuilder();
       JsonFormat.parser().merge(Structs.getString(cloudObject, SCHEMA), schemaBuilder);
       Schema schema = SchemaTranslation.schemaFromProto(schemaBuilder.build());
+      @Nullable UUID uuid = schema.getUUID();
+      if (schema.isEncodingPositionsOverridden() && uuid != null) {
+        RowCoder.overrideEncodingPositions(uuid, schema.getEncodingPositions());
+      }
       return RowCoder.of(schema);
     } catch (IOException e) {
       throw new RuntimeException(e);
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java
index da7a7d8..efd6e3c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java
@@ -18,6 +18,8 @@
 package org.apache.beam.runners.dataflow.util;
 
 import java.io.IOException;
+import java.util.UUID;
+import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.SchemaApi;
 import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -99,6 +101,10 @@ public class SchemaCoderCloudObjectTranslator implements CloudObjectTranslator<S
       SchemaApi.Schema.Builder schemaBuilder = SchemaApi.Schema.newBuilder();
       JsonFormat.parser().merge(Structs.getString(cloudObject, SCHEMA), schemaBuilder);
       Schema schema = SchemaTranslation.schemaFromProto(schemaBuilder.build());
+      @Nullable UUID uuid = schema.getUUID();
+      if (schema.isEncodingPositionsOverridden() && uuid != null) {
+        SchemaCoder.overrideEncodingPositions(uuid, schema.getEncodingPositions());
+      }
       return SchemaCoder.of(schema, typeDescriptor, toRowFunction, fromRowFunction);
     } catch (IOException e) {
       throw new RuntimeException(e);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
index 9ef658d..2d51411 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.sdk.coders;
 
+import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.schemas.Schema;
@@ -34,6 +36,11 @@ public class RowCoder extends SchemaCoder<Row> {
     return new RowCoder(schema);
   }
 
+  /** Override encoding positions for the given schema. */
+  public static void overrideEncodingPositions(UUID uuid, Map<String, Integer> encodingPositions) {
+    SchemaCoder.overrideEncodingPositions(uuid, encodingPositions);
+  }
+
   private RowCoder(Schema schema) {
     super(
         schema,
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
index 46c7758..a8d45b7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
@@ -112,9 +112,15 @@ public abstract class RowCoderGenerator {
 
   // Cache for Coder class that are already generated.
   private static final Map<UUID, Coder<Row>> GENERATED_CODERS = Maps.newConcurrentMap();
+  private static final Map<UUID, Map<String, Integer>> ENCODING_POSITION_OVERRIDES =
+      Maps.newConcurrentMap();
 
   private static final Logger LOG = LoggerFactory.getLogger(RowCoderGenerator.class);
 
+  public static void overrideEncodingPositions(UUID uuid, Map<String, Integer> encodingPositions) {
+    ENCODING_POSITION_OVERRIDES.put(uuid, encodingPositions);
+  }
+
   @SuppressWarnings("unchecked")
   public static Coder<Row> generate(Schema schema) {
     // Using ConcurrentHashMap::computeIfAbsent here would deadlock in case of nested
@@ -128,7 +134,8 @@ public abstract class RowCoderGenerator {
       builder = implementMethods(schema, builder);
 
       int[] encodingPosToRowIndex = new int[schema.getFieldCount()];
-      Map<String, Integer> encodingPositions = schema.getEncodingPositions();
+      Map<String, Integer> encodingPositions =
+          ENCODING_POSITION_OVERRIDES.getOrDefault(schema.getUUID(), schema.getEncodingPositions());
       for (int recordIndex = 0; recordIndex < schema.getFieldCount(); ++recordIndex) {
         String name = schema.getField(recordIndex).getName();
         int encodingPosition = encodingPositions.get(name);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 4acf367..2d35ab0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -96,6 +96,7 @@ public class Schema implements Serializable {
   // A mapping between field names an indices.
   private final BiMap<String, Integer> fieldIndices = HashBiMap.create();
   private Map<String, Integer> encodingPositions = Maps.newHashMap();
+  private boolean encodingPositionsOverridden = false;
 
   private final List<Field> fields;
   // Cache the hashCode, so it doesn't have to be recomputed. Schema objects are immutable, so this
@@ -287,9 +288,15 @@ public class Schema implements Serializable {
     return encodingPositions;
   }
 
+  /** Returns whether encoding positions have been explicitly overridden. */
+  public boolean isEncodingPositionsOverridden() {
+    return encodingPositionsOverridden;
+  }
+
   /** Sets the encoding positions for this schema. */
   public void setEncodingPositions(Map<String, Integer> encodingPositions) {
     this.encodingPositions = encodingPositions;
+    this.encodingPositionsOverridden = true;
   }
 
   /** Get this schema's UUID. */
@@ -398,6 +405,7 @@ public class Schema implements Serializable {
     builder.append(System.lineSeparator());
     builder.append("Options:");
     builder.append(options);
+    builder.append("UUID: " + uuid);
     return builder.toString();
   }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
index 015ff73..fe097d3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
@@ -22,6 +22,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -89,6 +90,11 @@ public class SchemaCoder<T> extends CustomCoder<T> {
     return RowCoder.of(schema);
   }
 
+  /** Override encoding positions for the given schema. */
+  public static void overrideEncodingPositions(UUID uuid, Map<String, Integer> encodingPositions) {
+    RowCoderGenerator.overrideEncodingPositions(uuid, encodingPositions);
+  }
+
   /** Returns the schema associated with this type. */
   public Schema getSchema() {
     return schema;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
index 1559732..7fe0f5c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
@@ -225,7 +225,7 @@ public class SchemaTranslation {
       // but if it does happen, we expect none to be specified - in which case the should all be
       // zero.
       Preconditions.checkState(dinstictEncodingPositions == 1);
-    } else {
+    } else if (protoSchema.getEncodingPositionsSet()) {
       schema.setEncodingPositions(encodingLocationMap);
     }
     if (!protoSchema.getId().isEmpty()) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index 93544f5..a19d9b3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -48,7 +48,6 @@ import org.apache.beam.sdk.values.RowUtils.FieldOverride;
 import org.apache.beam.sdk.values.RowUtils.FieldOverrides;
 import org.apache.beam.sdk.values.RowUtils.RowFieldMatcher;
 import org.apache.beam.sdk.values.RowUtils.RowPosition;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
@@ -579,7 +578,60 @@ public abstract class Row implements Serializable {
 
   @Override
   public String toString() {
-    return "Row:" + Arrays.deepToString(Iterables.toArray(getValues(), Object.class));
+    return toString(true);
+  }
+
+  /** Convert Row to String. */
+  public String toString(boolean includeFieldNames) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("Row: ");
+    builder.append(System.lineSeparator());
+    for (int i = 0; i < getSchema().getFieldCount(); ++i) {
+      Schema.Field field = getSchema().getField(i);
+      if (includeFieldNames) {
+        builder.append(field.getName() + ":");
+      }
+      builder.append(toString(field.getType(), getValue(i), includeFieldNames));
+      builder.append(System.lineSeparator());
+    }
+    return builder.toString();
+  }
+
+  private String toString(Schema.FieldType fieldType, Object value, boolean includeFieldNames) {
+    StringBuilder builder = new StringBuilder();
+    switch (fieldType.getTypeName()) {
+      case ARRAY:
+      case ITERABLE:
+        builder.append("[");
+        for (Object element : (Iterable<?>) value) {
+          builder.append(
+              toString(fieldType.getCollectionElementType(), element, includeFieldNames));
+          builder.append(", ");
+        }
+        builder.append("]");
+        break;
+      case MAP:
+        builder.append("{");
+        for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) {
+          builder.append("(");
+          builder.append(toString(fieldType.getMapKeyType(), entry.getKey(), includeFieldNames));
+          builder.append(", ");
+          builder.append(
+              toString(fieldType.getMapValueType(), entry.getValue(), includeFieldNames));
+          builder.append("), ");
+        }
+        builder.append("}");
+        break;
+      case BYTES:
+        builder.append(Arrays.toString((byte[]) value));
+        break;
+      case ROW:
+        builder.append(((Row) value).toString(includeFieldNames));
+        break;
+      default:
+        builder.append(value);
+    }
+    return builder.toString();
   }
 
   /**
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java
index 7b1b2af..d3e44db 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java
@@ -32,6 +32,6 @@ public class BeamSqlOutputToConsoleFn extends DoFn<Row, Void> {
 
   @ProcessElement
   public void processElement(ProcessContext c) {
-    System.out.println("Output: " + c.element().getValues());
+    System.out.println("Output: " + c.element().toString());
   }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index 3ab0ddd..6aa103b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -39,8 +39,8 @@ public class TestUtils {
   /** A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}. */
   public static class BeamSqlRow2StringDoFn extends DoFn<Row, String> {
     @ProcessElement
-    public void processElement(ProcessContext ctx) {
-      ctx.output(ctx.element().toString());
+    public void processElement(@Element Row row, OutputReceiver<String> o) {
+      o.output(row.toString(false));
     }
   }
 
@@ -48,7 +48,7 @@ public class TestUtils {
   public static List<String> beamSqlRows2Strings(List<Row> rows) {
     List<String> strs = new ArrayList<>();
     for (Row row : rows) {
-      strs.add(row.toString());
+      strs.add(row.toString(false));
     }
 
     return strs;