You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2021/04/29 04:14:44 UTC
[beam] branch master updated: Merge pull request #14639:
[BEAM-10277][BEAM-12198] Populate encoding positions to all matching
schemas
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 50fcf55 Merge pull request #14639: [BEAM-10277][BEAM-12198] Populate encoding positions to all matching schemas
50fcf55 is described below
commit 50fcf55d60e8fa7a8399d63e030c930a2d45402a
Author: reuvenlax <re...@google.com>
AuthorDate: Wed Apr 28 21:13:57 2021 -0700
Merge pull request #14639: [BEAM-10277][BEAM-12198] Populate encoding positions to all matching schemas
---
model/pipeline/src/main/proto/schema.proto | 4 ++
.../util/RowCoderCloudObjectTranslator.java | 6 +++
.../util/SchemaCoderCloudObjectTranslator.java | 6 +++
.../java/org/apache/beam/sdk/coders/RowCoder.java | 7 +++
.../apache/beam/sdk/coders/RowCoderGenerator.java | 9 +++-
.../java/org/apache/beam/sdk/schemas/Schema.java | 8 ++++
.../org/apache/beam/sdk/schemas/SchemaCoder.java | 6 +++
.../apache/beam/sdk/schemas/SchemaTranslation.java | 2 +-
.../main/java/org/apache/beam/sdk/values/Row.java | 56 +++++++++++++++++++++-
.../impl/transform/BeamSqlOutputToConsoleFn.java | 2 +-
.../apache/beam/sdk/extensions/sql/TestUtils.java | 6 +--
11 files changed, 104 insertions(+), 8 deletions(-)
diff --git a/model/pipeline/src/main/proto/schema.proto b/model/pipeline/src/main/proto/schema.proto
index a40087c..837689f 100644
--- a/model/pipeline/src/main/proto/schema.proto
+++ b/model/pipeline/src/main/proto/schema.proto
@@ -37,6 +37,8 @@ message Schema {
// REQUIRED. An RFC 4122 UUID.
string id = 2;
repeated Option options = 3;
+ // Indicates that encoding positions have been overridden.
+ bool encoding_positions_set = 4;
}
message Field {
@@ -52,6 +54,8 @@ message Field {
// or all of them are. Used to support backwards compatibility with schema
// changes.
// If no fields have encoding position populated the order of encoding is the same as the order in the Schema.
+ // If this Field is part of a Schema where encoding_positions_set is True then encoding_position must be
+ // defined, otherwise this field is ignored.
int32 encoding_position = 5;
repeated Option options = 6;
}
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RowCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RowCoderCloudObjectTranslator.java
index 6de8321..e84d89b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RowCoderCloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RowCoderCloudObjectTranslator.java
@@ -18,6 +18,8 @@
package org.apache.beam.runners.dataflow.util;
import java.io.IOException;
+import java.util.UUID;
+import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.annotations.Experimental;
@@ -59,6 +61,10 @@ public class RowCoderCloudObjectTranslator implements CloudObjectTranslator<RowC
SchemaApi.Schema.Builder schemaBuilder = SchemaApi.Schema.newBuilder();
JsonFormat.parser().merge(Structs.getString(cloudObject, SCHEMA), schemaBuilder);
Schema schema = SchemaTranslation.schemaFromProto(schemaBuilder.build());
+ @Nullable UUID uuid = schema.getUUID();
+ if (schema.isEncodingPositionsOverridden() && uuid != null) {
+ RowCoder.overrideEncodingPositions(uuid, schema.getEncodingPositions());
+ }
return RowCoder.of(schema);
} catch (IOException e) {
throw new RuntimeException(e);
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java
index da7a7d8..efd6e3c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/SchemaCoderCloudObjectTranslator.java
@@ -18,6 +18,8 @@
package org.apache.beam.runners.dataflow.util;
import java.io.IOException;
+import java.util.UUID;
+import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.annotations.Experimental;
@@ -99,6 +101,10 @@ public class SchemaCoderCloudObjectTranslator implements CloudObjectTranslator<S
SchemaApi.Schema.Builder schemaBuilder = SchemaApi.Schema.newBuilder();
JsonFormat.parser().merge(Structs.getString(cloudObject, SCHEMA), schemaBuilder);
Schema schema = SchemaTranslation.schemaFromProto(schemaBuilder.build());
+ @Nullable UUID uuid = schema.getUUID();
+ if (schema.isEncodingPositionsOverridden() && uuid != null) {
+ SchemaCoder.overrideEncodingPositions(uuid, schema.getEncodingPositions());
+ }
return SchemaCoder.of(schema, typeDescriptor, toRowFunction, fromRowFunction);
} catch (IOException e) {
throw new RuntimeException(e);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
index 9ef658d..2d51411 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
@@ -17,7 +17,9 @@
*/
package org.apache.beam.sdk.coders;
+import java.util.Map;
import java.util.Objects;
+import java.util.UUID;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.Schema;
@@ -34,6 +36,11 @@ public class RowCoder extends SchemaCoder<Row> {
return new RowCoder(schema);
}
+ /** Override encoding positions for the given schema. */
+ public static void overrideEncodingPositions(UUID uuid, Map<String, Integer> encodingPositions) {
+ SchemaCoder.overrideEncodingPositions(uuid, encodingPositions);
+ }
+
private RowCoder(Schema schema) {
super(
schema,
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
index 46c7758..a8d45b7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
@@ -112,9 +112,15 @@ public abstract class RowCoderGenerator {
// Cache for Coder class that are already generated.
private static final Map<UUID, Coder<Row>> GENERATED_CODERS = Maps.newConcurrentMap();
+ private static final Map<UUID, Map<String, Integer>> ENCODING_POSITION_OVERRIDES =
+ Maps.newConcurrentMap();
private static final Logger LOG = LoggerFactory.getLogger(RowCoderGenerator.class);
+ public static void overrideEncodingPositions(UUID uuid, Map<String, Integer> encodingPositions) {
+ ENCODING_POSITION_OVERRIDES.put(uuid, encodingPositions);
+ }
+
@SuppressWarnings("unchecked")
public static Coder<Row> generate(Schema schema) {
// Using ConcurrentHashMap::computeIfAbsent here would deadlock in case of nested
@@ -128,7 +134,8 @@ public abstract class RowCoderGenerator {
builder = implementMethods(schema, builder);
int[] encodingPosToRowIndex = new int[schema.getFieldCount()];
- Map<String, Integer> encodingPositions = schema.getEncodingPositions();
+ Map<String, Integer> encodingPositions =
+ ENCODING_POSITION_OVERRIDES.getOrDefault(schema.getUUID(), schema.getEncodingPositions());
for (int recordIndex = 0; recordIndex < schema.getFieldCount(); ++recordIndex) {
String name = schema.getField(recordIndex).getName();
int encodingPosition = encodingPositions.get(name);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 4acf367..2d35ab0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -96,6 +96,7 @@ public class Schema implements Serializable {
// A mapping between field names an indices.
private final BiMap<String, Integer> fieldIndices = HashBiMap.create();
private Map<String, Integer> encodingPositions = Maps.newHashMap();
+ private boolean encodingPositionsOverridden = false;
private final List<Field> fields;
// Cache the hashCode, so it doesn't have to be recomputed. Schema objects are immutable, so this
@@ -287,9 +288,15 @@ public class Schema implements Serializable {
return encodingPositions;
}
+ /** Returns whether encoding positions have been explicitly overridden. */
+ public boolean isEncodingPositionsOverridden() {
+ return encodingPositionsOverridden;
+ }
+
/** Sets the encoding positions for this schema. */
public void setEncodingPositions(Map<String, Integer> encodingPositions) {
this.encodingPositions = encodingPositions;
+ this.encodingPositionsOverridden = true;
}
/** Get this schema's UUID. */
@@ -398,6 +405,7 @@ public class Schema implements Serializable {
builder.append(System.lineSeparator());
builder.append("Options:");
builder.append(options);
+ builder.append("UUID: " + uuid);
return builder.toString();
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
index 015ff73..fe097d3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaCoder.java
@@ -22,6 +22,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.beam.sdk.annotations.Experimental;
@@ -89,6 +90,11 @@ public class SchemaCoder<T> extends CustomCoder<T> {
return RowCoder.of(schema);
}
+ /** Override encoding positions for the given schema. */
+ public static void overrideEncodingPositions(UUID uuid, Map<String, Integer> encodingPositions) {
+ RowCoderGenerator.overrideEncodingPositions(uuid, encodingPositions);
+ }
+
/** Returns the schema associated with this type. */
public Schema getSchema() {
return schema;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
index 1559732..7fe0f5c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
@@ -225,7 +225,7 @@ public class SchemaTranslation {
// but if it does happen, we expect none to be specified - in which case the should all be
// zero.
Preconditions.checkState(dinstictEncodingPositions == 1);
- } else {
+ } else if (protoSchema.getEncodingPositionsSet()) {
schema.setEncodingPositions(encodingLocationMap);
}
if (!protoSchema.getId().isEmpty()) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index 93544f5..a19d9b3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -48,7 +48,6 @@ import org.apache.beam.sdk.values.RowUtils.FieldOverride;
import org.apache.beam.sdk.values.RowUtils.FieldOverrides;
import org.apache.beam.sdk.values.RowUtils.RowFieldMatcher;
import org.apache.beam.sdk.values.RowUtils.RowPosition;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
@@ -579,7 +578,60 @@ public abstract class Row implements Serializable {
@Override
public String toString() {
- return "Row:" + Arrays.deepToString(Iterables.toArray(getValues(), Object.class));
+ return toString(true);
+ }
+
+ /** Convert Row to String. */
+ public String toString(boolean includeFieldNames) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("Row: ");
+ builder.append(System.lineSeparator());
+ for (int i = 0; i < getSchema().getFieldCount(); ++i) {
+ Schema.Field field = getSchema().getField(i);
+ if (includeFieldNames) {
+ builder.append(field.getName() + ":");
+ }
+ builder.append(toString(field.getType(), getValue(i), includeFieldNames));
+ builder.append(System.lineSeparator());
+ }
+ return builder.toString();
+ }
+
+ private String toString(Schema.FieldType fieldType, Object value, boolean includeFieldNames) {
+ StringBuilder builder = new StringBuilder();
+ switch (fieldType.getTypeName()) {
+ case ARRAY:
+ case ITERABLE:
+ builder.append("[");
+ for (Object element : (Iterable<?>) value) {
+ builder.append(
+ toString(fieldType.getCollectionElementType(), element, includeFieldNames));
+ builder.append(", ");
+ }
+ builder.append("]");
+ break;
+ case MAP:
+ builder.append("{");
+ for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) {
+ builder.append("(");
+ builder.append(toString(fieldType.getMapKeyType(), entry.getKey(), includeFieldNames));
+ builder.append(", ");
+ builder.append(
+ toString(fieldType.getMapValueType(), entry.getValue(), includeFieldNames));
+ builder.append("), ");
+ }
+ builder.append("}");
+ break;
+ case BYTES:
+ builder.append(Arrays.toString((byte[]) value));
+ break;
+ case ROW:
+ builder.append(((Row) value).toString(includeFieldNames));
+ break;
+ default:
+ builder.append(value);
+ }
+ return builder.toString();
}
/**
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java
index 7b1b2af..d3e44db 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlOutputToConsoleFn.java
@@ -32,6 +32,6 @@ public class BeamSqlOutputToConsoleFn extends DoFn<Row, Void> {
@ProcessElement
public void processElement(ProcessContext c) {
- System.out.println("Output: " + c.element().getValues());
+ System.out.println("Output: " + c.element().toString());
}
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index 3ab0ddd..6aa103b 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -39,8 +39,8 @@ public class TestUtils {
/** A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}. */
public static class BeamSqlRow2StringDoFn extends DoFn<Row, String> {
@ProcessElement
- public void processElement(ProcessContext ctx) {
- ctx.output(ctx.element().toString());
+ public void processElement(@Element Row row, OutputReceiver<String> o) {
+ o.output(row.toString(false));
}
}
@@ -48,7 +48,7 @@ public class TestUtils {
public static List<String> beamSqlRows2Strings(List<Row> rows) {
List<String> strs = new ArrayList<>();
for (Row row : rows) {
- strs.add(row.toString());
+ strs.add(row.toString(false));
}
return strs;