You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2019/05/15 01:17:23 UTC
[beam] branch master updated: Merge pull request #8425: [BEAM-7174]
Add schema modification transforms
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c630217 Merge pull request #8425: [BEAM-7174] Add schema modification transforms
c630217 is described below
commit c6302174ea5a60dbc09ae318ea2c5470d4060ef7
Author: reuvenlax <re...@google.com>
AuthorDate: Tue May 14 18:17:04 2019 -0700
Merge pull request #8425: [BEAM-7174] Add schema modification transforms
---
.../beam/sdk/schemas/FieldAccessDescriptor.java | 56 ++-
.../java/org/apache/beam/sdk/schemas/Schema.java | 4 +
.../beam/sdk/schemas/transforms/AddFields.java | 434 +++++++++++++++++++++
.../beam/sdk/schemas/transforms/DropFields.java | 149 +++++++
.../beam/sdk/schemas/transforms/RenameFields.java | 187 +++++++++
.../beam/sdk/schemas/transforms/AddFieldsTest.java | 356 +++++++++++++++++
.../sdk/schemas/transforms/DropFieldsTest.java | 161 ++++++++
.../sdk/schemas/transforms/RenameFieldsTest.java | 261 +++++++++++++
8 files changed, 1607 insertions(+), 1 deletion(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
index 060222c..55b9939 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldAccessDescriptor.java
@@ -48,6 +48,7 @@ import org.apache.beam.sdk.schemas.parser.FieldAccessDescriptorParser;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
@@ -229,7 +230,7 @@ public abstract class FieldAccessDescriptor implements Serializable {
return builder().setFieldsAccessed(Lists.newArrayList(fields)).build();
}
- // Union a set of FieldAccessDescriptors. This function currenty only supports descriptors with
+ // Union a set of FieldAccessDescriptors. This function currently only supports descriptors with
// containing named fields, not those containing ids.
private static FieldAccessDescriptor union(
Iterable<FieldAccessDescriptor> fieldAccessDescriptors) {
@@ -328,6 +329,15 @@ public abstract class FieldAccessDescriptor implements Serializable {
}
/**
+ * Return the field names accessed. Should not be called until after {@link #resolve} is called.
+ */
+ public Set<String> fieldNamesAccessed() {
+ return getFieldsAccessed().stream()
+ .map(FieldDescriptor::getFieldName)
+ .collect(Collectors.toSet());
+ }
+
+ /**
* Return the nested fields keyed by field ids. Should not be called until after {@link #resolve}
* is called.
*/
@@ -337,6 +347,32 @@ public abstract class FieldAccessDescriptor implements Serializable {
}
/**
+ * Return the nested fields keyed by field name. Should not be called until after {@link #resolve}
+ * is called.
+ */
+ public Map<String, FieldAccessDescriptor> nestedFieldsByName() {
+ return getNestedFieldsAccessed().entrySet().stream()
+ .collect(Collectors.toMap(f -> f.getKey().getFieldName(), f -> f.getValue()));
+ }
+
+ /** Returns true if this descriptor references only a single, non-wildcard field. */
+ public boolean referencesSingleField() {
+ if (getAllFields()) {
+ return false;
+ }
+
+ if (getFieldsAccessed().size() == 1 && getNestedFieldsAccessed().isEmpty()) {
+ return true;
+ }
+
+ if (getFieldsAccessed().isEmpty() && getNestedFieldsAccessed().size() == 1) {
+ return getNestedFieldsAccessed().values().iterator().next().referencesSingleField();
+ }
+
+ return false;
+ }
+
+ /**
* Resolve the {@link FieldAccessDescriptor} against a schema.
*
* <p>Resolve will resolve all of the field names into field ids, validating that all field names
@@ -515,4 +551,22 @@ public abstract class FieldAccessDescriptor implements Serializable {
}
}
}
+
+ @Override
+ public String toString() {
+ if (getAllFields()) {
+ return "*";
+ }
+
+ List<String> singleSelectors =
+ getFieldsAccessed().stream()
+ .map(FieldDescriptor::getFieldName)
+ .collect(Collectors.toList());
+ List<String> nestedSelectors =
+ getNestedFieldsAccessed().entrySet().stream()
+ .map(e -> e.getKey().getFieldName() + "." + e.getValue().toString())
+ .collect(Collectors.toList());
+ ;
+ return String.join(", ", Iterables.concat(singleSelectors, nestedSelectors));
+ }
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 21fd782..d659b25 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -196,6 +196,10 @@ public class Schema implements Serializable {
return this;
}
+ public int getLastFieldId() {
+ return fields.size() - 1;
+ }
+
public Schema build() {
return new Schema(fields);
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/AddFields.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/AddFields.java
new file mode 100644
index 0000000..1019d0e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/AddFields.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor.Qualifier;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor.Qualifier.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimaps;
+
+/**
+ * A transform to add new nullable fields to a PCollection's schema. Elements are extended to have
+ * the new schema. By default new fields are nullable, and input rows will be extended to the new
+ * schema by inserting null values. However explicit default values for new fields can be set using
+ * {@link Inner#field(String, Schema.FieldType, Object)}. Nested fields can be added as well.
+ *
+ * <p>Example use:
+ *
+ * <pre>{@code PCollection<Event> events = readEvents();
+ * PCollection<Row> augmentedEvents =
+ * events.apply(AddFields.<Event>create()
+ * .field("userId", FieldType.STRING)
+ * .field("location.zipcode", FieldType.INT32)
+ * .field("userDetails.isSpecialUser", "FieldType.BOOLEAN", false));
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SCHEMAS)
+public class AddFields {
+ public static <T> Inner<T> create() {
+ return new Inner<>();
+ }
+
+ /** Inner PTransform for AddFields. */
+ public static class Inner<T> extends PTransform<PCollection<T>, PCollection<Row>> {
+ /** Internal object representing a new field added. */
+ @AutoValue
+ abstract static class NewField implements Serializable {
+ abstract String getName();
+
+ abstract FieldAccessDescriptor getDescriptor();
+
+ abstract Schema.FieldType getFieldType();
+
+ @Nullable
+ abstract Object getDefaultValue();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setName(String name);
+
+ abstract Builder setDescriptor(FieldAccessDescriptor descriptor);
+
+ abstract Builder setFieldType(Schema.FieldType fieldType);
+
+ abstract Builder setDefaultValue(@Nullable Object defaultValue);
+
+ abstract NewField build();
+ }
+
+ abstract Builder toBuilder();
+
+ static NewField of(
+ FieldAccessDescriptor fieldAccessDescriptor,
+ Schema.FieldType fieldType,
+ Object defaultValue) {
+ return new AutoValue_AddFields_Inner_NewField.Builder()
+ .setName(getName(fieldAccessDescriptor))
+ .setDescriptor(fieldAccessDescriptor)
+ .setFieldType(fieldType)
+ .setDefaultValue(defaultValue)
+ .build();
+ }
+
+ // If this field represents a nested value, pop the FieldAccessDescriptor one level down.
+ NewField descend() {
+ FieldAccessDescriptor descriptor =
+ Iterables.getOnlyElement(getDescriptor().getNestedFieldsAccessed().values());
+ return toBuilder().setDescriptor(descriptor).setName(getName(descriptor)).build();
+ }
+
+ static String getName(FieldAccessDescriptor descriptor) {
+ if (!descriptor.getFieldsAccessed().isEmpty()) {
+ return Iterables.getOnlyElement(descriptor.fieldNamesAccessed());
+ } else {
+ return Iterables.getOnlyElement(descriptor.nestedFieldsByName().keySet());
+ }
+ }
+
+ FieldAccessDescriptor.FieldDescriptor getFieldDescriptor() {
+ if (!getDescriptor().getFieldsAccessed().isEmpty()) {
+ return Iterables.getOnlyElement(getDescriptor().getFieldsAccessed());
+ } else {
+ return Iterables.getOnlyElement(getDescriptor().getNestedFieldsAccessed().keySet());
+ }
+ }
+ }
+
+ /** This class encapsulates all data needed to add a a new field to the schema. */
+ @AutoValue
+ abstract static class AddFieldsInformation implements Serializable {
+ // The new output fieldtype after adding the new field.
+ @Nullable
+ abstract Schema.FieldType getOutputFieldType();
+
+ // A list of default values corresponding to this level of the schema.
+ abstract List<Object> getDefaultValues();
+
+ // A list of nested values. This list corresponds to the output schema fields, and is
+ // populated for fields that
+ // have new nested values. For other fields, the list contains a null value.
+ abstract List<AddFieldsInformation> getNestedNewValues();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract AddFieldsInformation.Builder setOutputFieldType(Schema.FieldType outputFieldType);
+
+ abstract AddFieldsInformation.Builder setDefaultValues(List<Object> defaultValues);
+
+ abstract AddFieldsInformation.Builder setNestedNewValues(
+ List<AddFieldsInformation> nestedNewValues);
+
+ abstract AddFieldsInformation build();
+ }
+
+ abstract AddFieldsInformation.Builder toBuilder();
+
+ static AddFieldsInformation of(
+ Schema.FieldType outputFieldType,
+ List<Object> defaultValues,
+ List<AddFieldsInformation> nestedNewValues) {
+ return new AutoValue_AddFields_Inner_AddFieldsInformation.Builder()
+ .setOutputFieldType(outputFieldType)
+ .setDefaultValues(defaultValues)
+ .setNestedNewValues(nestedNewValues)
+ .build();
+ }
+ }
+
+ private final List<NewField> newFields;
+
+ private Inner() {
+ this.newFields = Collections.emptyList();
+ }
+
+ private Inner(List<NewField> newFields) {
+ this.newFields = newFields;
+ }
+
+ /**
+ * Add a new field of the specified type. The new field will be nullable and will be filled in
+ * with null values.
+ */
+ public Inner<T> field(String fieldName, Schema.FieldType fieldType) {
+ return field(fieldName, fieldType.withNullable(true), null);
+ }
+
+ /**
+ * Add a new field of the specified type. The new field will be filled in with the specified
+ * value.
+ */
+ public Inner<T> field(String fieldName, Schema.FieldType fieldType, Object defaultValue) {
+ if (defaultValue == null) {
+ checkArgument(fieldType.getNullable());
+ }
+
+ FieldAccessDescriptor descriptor = FieldAccessDescriptor.withFieldNames(fieldName);
+ checkArgument(descriptor.referencesSingleField());
+ List<NewField> fields =
+ ImmutableList.<NewField>builder()
+ .addAll(newFields)
+ .add(NewField.of(descriptor, fieldType, defaultValue))
+ .build();
+ return new Inner<>(fields);
+ }
+
+ private static AddFieldsInformation getAddFieldsInformation(
+ Schema inputSchema, Collection<NewField> fieldsToAdd) {
+ List<NewField> newTopLevelFields =
+ fieldsToAdd.stream()
+ .filter(n -> !n.getDescriptor().getFieldsAccessed().isEmpty())
+ .collect(Collectors.toList());
+ List<NewField> newNestedFields =
+ fieldsToAdd.stream()
+ .filter(n -> !n.getDescriptor().getNestedFieldsAccessed().isEmpty())
+ .collect(Collectors.toList());
+ // Group all nested fields together by the field at the current level. For example, if adding
+ // a.b, a.c, a.d
+ // this map will contain a -> {a.b, a.c, a.d}.
+ Multimap<String, NewField> newNestedFieldsMap =
+ Multimaps.index(newNestedFields, NewField::getName);
+
+ Map<Integer, AddFieldsInformation> resolvedNestedNewValues = Maps.newHashMap();
+ Schema.Builder builder = Schema.builder();
+ for (int i = 0; i < inputSchema.getFieldCount(); ++i) {
+ Schema.Field field = inputSchema.getField(i);
+ Collection<NewField> nestedFields = newNestedFieldsMap.get(field.getName());
+
+ // If this field is a nested field and new subfields are added further down the tree, add
+ // those subfields before
+ // adding to the current schema. Otherwise we just add this field as is to the new schema.
+ if (!nestedFields.isEmpty()) {
+ nestedFields = nestedFields.stream().map(NewField::descend).collect(Collectors.toList());
+
+ AddFieldsInformation nestedInformation =
+ getAddFieldsInformation(field.getType(), nestedFields);
+ field = field.withType(nestedInformation.getOutputFieldType());
+ resolvedNestedNewValues.put(i, nestedInformation);
+ }
+ builder.addField(field);
+ }
+
+ // Add any new fields at this level.
+ List<Object> newValuesThisLevel = new ArrayList<>(newTopLevelFields.size());
+ for (NewField newField : newTopLevelFields) {
+ builder.addField(newField.getName(), newField.getFieldType());
+ newValuesThisLevel.add(newField.getDefaultValue());
+ }
+
+ // If there are any nested field additions left that are not already processed, that means
+ // that the root of the
+ // nested field doesn't exist in the schema. In this case we'll walk down the new nested
+ // fields and recursively create each nested level as necessary.
+ for (Map.Entry<String, Collection<NewField>> newNested :
+ newNestedFieldsMap.asMap().entrySet()) {
+ String fieldName = newNested.getKey();
+
+ // If the user specifies the same nested field twice in different ways (e.g. a[].x, a{}.x)
+ FieldAccessDescriptor.FieldDescriptor fieldDescriptor =
+ Iterables.getOnlyElement(
+ newNested.getValue().stream()
+ .map(NewField::getFieldDescriptor)
+ .distinct()
+ .collect(Collectors.toList()));
+ FieldType fieldType = Schema.FieldType.row(Schema.of()).withNullable(true);
+ for (Qualifier qualifier : fieldDescriptor.getQualifiers()) {
+ // The problem with adding recursive map fields is that we don't know what the map key
+ // type should be.
+ // In a field descriptor of the form mapField{}.subField, the subField is assumed to be in
+ // the map value.
+ // Since in this code path the mapField field does not already exist this means we need to
+ // create the new
+ // map field, and we have no way of knowing what type the key should be.
+ // Alternatives would be to always create a default key type (e.g. FieldType.STRING) or
+ // extend our selector
+ // syntax to allow specifying key types.
+ checkArgument(!qualifier.getKind().equals(Kind.MAP), "Map qualifiers not supported here");
+ fieldType = FieldType.array(fieldType).withNullable(true);
+ }
+ if (!inputSchema.hasField(fieldName)) {
+ // This is a brand-new nested field with no matching field in the input schema. We will
+ // recursively create a nested schema to match it.
+ Collection<NewField> nestedNewFields =
+ newNested.getValue().stream().map(NewField::descend).collect(Collectors.toList());
+ AddFieldsInformation addFieldsInformation =
+ getAddFieldsInformation(fieldType, nestedNewFields);
+ builder.addField(fieldName, addFieldsInformation.getOutputFieldType());
+ resolvedNestedNewValues.put(builder.getLastFieldId(), addFieldsInformation);
+ }
+ }
+ Schema schema = builder.build();
+
+ List<AddFieldsInformation> nestedNewValueList =
+ new ArrayList<>(Collections.nCopies(schema.getFieldCount(), null));
+ for (Map.Entry<Integer, AddFieldsInformation> entry : resolvedNestedNewValues.entrySet()) {
+ nestedNewValueList.set(entry.getKey(), entry.getValue());
+ }
+ return AddFieldsInformation.of(
+ Schema.FieldType.row(schema), newValuesThisLevel, nestedNewValueList);
+ }
+
+ private static AddFieldsInformation getAddFieldsInformation(
+ Schema.FieldType inputFieldType, Collection<NewField> nestedFields) {
+ AddFieldsInformation addFieldsInformation;
+ Schema.FieldType fieldType;
+ switch (inputFieldType.getTypeName()) {
+ case ROW:
+ addFieldsInformation =
+ getAddFieldsInformation(inputFieldType.getRowSchema(), nestedFields);
+ fieldType = addFieldsInformation.getOutputFieldType();
+ break;
+
+ case ARRAY:
+ addFieldsInformation =
+ getAddFieldsInformation(inputFieldType.getCollectionElementType(), nestedFields);
+ fieldType = Schema.FieldType.array(addFieldsInformation.getOutputFieldType());
+ break;
+
+ case MAP:
+ addFieldsInformation =
+ getAddFieldsInformation(inputFieldType.getMapValueType(), nestedFields);
+ fieldType =
+ Schema.FieldType.map(
+ inputFieldType.getMapKeyType(), addFieldsInformation.getOutputFieldType());
+ break;
+
+ default:
+ throw new RuntimeException("Cannot select a subfield of a non-composite type.");
+ }
+ fieldType = fieldType.withNullable(inputFieldType.getNullable());
+ return addFieldsInformation.toBuilder().setOutputFieldType(fieldType).build();
+ }
+
+ private static Row fillNewFields(Row row, AddFieldsInformation addFieldsInformation) {
+ Schema outputSchema = checkNotNull(addFieldsInformation.getOutputFieldType().getRowSchema());
+
+ List<Object> newValues = Lists.newArrayListWithCapacity(outputSchema.getFieldCount());
+ for (int i = 0; i < row.getFieldCount(); ++i) {
+ AddFieldsInformation nested = addFieldsInformation.getNestedNewValues().get(i);
+ if (nested != null) {
+ // New fields were added to nested subfields of this value. Recursively fill them out
+ // before adding to the new row.
+ Object newValue = fillNewFields(row.getValue(i), nested.getOutputFieldType(), nested);
+ newValues.add(newValue);
+ } else {
+ // Nothing changed. Just copy the old value into the new row.
+ newValues.add(row.getValue(i));
+ }
+ }
+ // If there are brand new simple (i.e. have no nested values) fields at this level, then add
+ // the default values for all of them.
+ newValues.addAll(addFieldsInformation.getDefaultValues());
+ // If we are creating new recursive fields, populate new values for them here.
+ for (int i = newValues.size(); i < addFieldsInformation.getNestedNewValues().size(); ++i) {
+ AddFieldsInformation newNestedField = addFieldsInformation.getNestedNewValues().get(i);
+ if (newNestedField != null) {
+ newValues.add(fillNewFields(null, newNestedField.getOutputFieldType(), newNestedField));
+ }
+ }
+
+ return Row.withSchema(outputSchema).attachValues(newValues).build();
+ }
+
+ private static Object fillNewFields(
+ Object original, Schema.FieldType fieldType, AddFieldsInformation addFieldsInformation) {
+ switch (fieldType.getTypeName()) {
+ case ROW:
+ if (original == null) {
+ original = Row.withSchema(fieldType.getRowSchema()).build();
+ }
+ return fillNewFields((Row) original, addFieldsInformation);
+
+ case ARRAY:
+ if (original == null) {
+ return Collections.emptyList();
+ }
+ List<Object> list = (List<Object>) original;
+ List<Object> filledList = new ArrayList<>(list.size());
+ Schema.FieldType elementType = fieldType.getCollectionElementType();
+ AddFieldsInformation elementAddFieldInformation =
+ addFieldsInformation.toBuilder().setOutputFieldType(elementType).build();
+ for (Object element : list) {
+ filledList.add(fillNewFields(element, elementType, elementAddFieldInformation));
+ }
+ return filledList;
+
+ case MAP:
+ if (original == null) {
+ return Collections.emptyMap();
+ }
+ Map<Object, Object> originalMap = (Map<Object, Object>) original;
+ Map<Object, Object> filledMap = Maps.newHashMapWithExpectedSize(originalMap.size());
+ Schema.FieldType mapValueType = fieldType.getMapValueType();
+ AddFieldsInformation mapValueAddFieldInformation =
+ addFieldsInformation.toBuilder().setOutputFieldType(mapValueType).build();
+ for (Map.Entry<Object, Object> entry : originalMap.entrySet()) {
+ filledMap.put(
+ entry.getKey(),
+ fillNewFields(entry.getValue(), mapValueType, mapValueAddFieldInformation));
+ }
+ return filledMap;
+
+ default:
+ throw new RuntimeException("Unexpected field type");
+ }
+ }
+
+ @Override
+ public PCollection<Row> expand(PCollection<T> input) {
+ final AddFieldsInformation addFieldsInformation =
+ getAddFieldsInformation(input.getSchema(), newFields);
+ Schema outputSchema = checkNotNull(addFieldsInformation.getOutputFieldType().getRowSchema());
+
+ return input
+ .apply(
+ ParDo.of(
+ new DoFn<T, Row>() {
+ @ProcessElement
+ public void processElement(@Element Row row, OutputReceiver<Row> o) {
+ o.output(fillNewFields(row, addFieldsInformation));
+ }
+ }))
+ .setRowSchema(outputSchema);
+ }
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/DropFields.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/DropFields.java
new file mode 100644
index 0000000..d1c4d12
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/DropFields.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
+
+/**
+ * A transform to drop fields from a schema.
+ *
+ * <p>This transform acts as the inverse of the {@link Select} transform. A list of fields to drop
+ * is specified, and all fields in the schema that are not specified are selected. For example:
+ *
+ * <pre>{@code @DefaultSchema(JavaFieldSchema.class)
+ * public class UserEvent {
+ * public String userId;
+ * public String eventId;
+ * public int eventType;
+ * public Location location;
+ * }}</pre>
+ *
+ * <pre>{@code @DefaultSchema(JavaFieldSchema.class)
+ * public class Location {
+ * public double latitude;
+ * public double longtitude;
+ * }
+ *
+ * PCollection<UserEvent> events = readUserEvents();
+ * // Drop the location field.
+ * PCollection<Row> noLocation = events.apply(DropFields.fields("location"));
+ * // Drop the latitude field.
+ * PCollection<Row> noLatitude = events.apply(DropFields.fields("location.latitude"));
+ * }</pre>
+ */
+@Experimental(Kind.SCHEMAS)
+public class DropFields {
+ public static <T> Inner<T> fields(String... fields) {
+ return fields(FieldAccessDescriptor.withFieldNames(fields));
+ }
+
+ public static <T> Inner<T> fields(Integer... fieldIds) {
+ return fields(FieldAccessDescriptor.withFieldIds(fieldIds));
+ }
+
+ public static <T> Inner<T> fields(FieldAccessDescriptor fieldsToDrop) {
+ return new Inner<>(fieldsToDrop);
+ }
+
+ /** Implementation class for DropFields. */
+ public static class Inner<T> extends PTransform<PCollection<T>, PCollection<Row>> {
+ private final FieldAccessDescriptor fieldsToDrop;
+
+ private Inner(FieldAccessDescriptor fieldsToDrop) {
+ this.fieldsToDrop = fieldsToDrop;
+ }
+
+ FieldAccessDescriptor complement(Schema inputSchema, FieldAccessDescriptor input) {
+ // Create a FieldAccessDescriptor that select all fields _not_ selected in the input
+ // descriptor. Maintain
+ // the original order of the schema.
+ Set<String> fieldNamesToSelect = Sets.newHashSet();
+ Map<FieldAccessDescriptor.FieldDescriptor, FieldAccessDescriptor> nestedFieldsToSelect =
+ Maps.newHashMap();
+ for (int i = 0; i < inputSchema.getFieldCount(); ++i) {
+ if (input.fieldIdsAccessed().contains(i)) {
+ // This field is selected, so exclude it from the complement.
+ continue;
+ }
+ Field field = inputSchema.getField(i);
+ Map<Integer, FieldAccessDescriptor.FieldDescriptor> nestedFields =
+ input.getNestedFieldsAccessed().keySet().stream()
+ .collect(Collectors.toMap(k -> k.getFieldId(), k -> k));
+
+ FieldAccessDescriptor.FieldDescriptor fieldDescriptor = nestedFields.get(i);
+ if (fieldDescriptor != null) {
+ // Some subfields are selected, so recursively calculate the complementary subfields to
+ // select.
+ FieldType fieldType = inputSchema.getField(i).getType();
+ for (FieldAccessDescriptor.FieldDescriptor.Qualifier qualifier :
+ fieldDescriptor.getQualifiers()) {
+ switch (qualifier.getKind()) {
+ case LIST:
+ fieldType = fieldType.getCollectionElementType();
+ break;
+ case MAP:
+ fieldType = fieldType.getMapValueType();
+ break;
+ default:
+ throw new RuntimeException("Unexpected field descriptor type.");
+ }
+ }
+ checkArgument(fieldType.getTypeName().isCompositeType());
+ FieldAccessDescriptor nestedDescriptor =
+ input.getNestedFieldsAccessed().get(fieldDescriptor);
+ nestedFieldsToSelect.put(
+ fieldDescriptor, complement(fieldType.getRowSchema(), nestedDescriptor));
+ } else {
+ // Neither the field nor the subfield is selected. This means we should select it.
+ fieldNamesToSelect.add(field.getName());
+ }
+ }
+
+ FieldAccessDescriptor fieldAccess = FieldAccessDescriptor.withFieldNames(fieldNamesToSelect);
+ for (Map.Entry<FieldAccessDescriptor.FieldDescriptor, FieldAccessDescriptor> entry :
+ nestedFieldsToSelect.entrySet()) {
+ fieldAccess = fieldAccess.withNestedField(entry.getKey(), entry.getValue());
+ }
+ return fieldAccess.resolve(inputSchema);
+ }
+
+ @Override
+ public PCollection<Row> expand(PCollection<T> input) {
+ Schema inputSchema = input.getSchema();
+ FieldAccessDescriptor selectDescriptor =
+ complement(inputSchema, fieldsToDrop.resolve(inputSchema));
+
+ return input.apply(Select.fieldAccess(selectDescriptor));
+ }
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java
new file mode 100644
index 0000000..07d8499
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/RenameFields.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
+import org.apache.commons.compress.utils.Lists;
+
+/**
+ * A transform for renaming fields inside an existing schema. Top level or nested fields can be
+ * renamed. When renaming a nested field, the nested prefix does not need to be specified again when
+ * specifying the new name.
+ *
+ * <p>Example use:
+ *
+ * <pre>{@code PCollection<Event> events = readEvents();
+ * PCollection<Row> renamedEvents =
+ * events.apply(RenameFields.<Event>create()
+ * .rename("userName", "userId")
+ * .rename("location.country", "countryCode"));
+ * }</pre>
+ */
+@Experimental(Kind.SCHEMAS)
+public class RenameFields {
+ /** Create an instance of this transform. */
+ public static <T> Inner<T> create() {
+ return new Inner<>();
+ }
+
+ // Describes a single renameSchema rule.
+ private static class RenamePair implements Serializable {
+ // The FieldAccessDescriptor describing the field to renameSchema. Must reference a singleton
+ // field.
+ private final FieldAccessDescriptor fieldAccessDescriptor;
+ // The new name for the field.
+ private final String newName;
+
+ RenamePair(FieldAccessDescriptor fieldAccessDescriptor, String newName) {
+ this.fieldAccessDescriptor = fieldAccessDescriptor;
+ this.newName = newName;
+ }
+
+ RenamePair resolve(Schema schema) {
+ FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+ if (!resolved.referencesSingleField()) {
+ throw new IllegalArgumentException(resolved + " references multiple fields.");
+ }
+ return new RenamePair(resolved, newName);
+ }
+ }
+
+ private static FieldType renameFieldType(FieldType inputType, Collection<RenamePair> renames) {
+ switch (inputType.getTypeName()) {
+ case ROW:
+ return FieldType.row(renameSchema(inputType.getRowSchema(), renames));
+ case ARRAY:
+ return FieldType.array(renameFieldType(inputType.getCollectionElementType(), renames));
+ case MAP:
+ return FieldType.map(
+ renameFieldType(inputType.getMapKeyType(), renames),
+ renameFieldType(inputType.getMapValueType(), renames));
+ default:
+ return inputType;
+ }
+ }
+
+ // Apply the user-specified renames to the input schema.
+ private static Schema renameSchema(Schema inputSchema, Collection<RenamePair> renames) {
+ // The mapping of renames to apply at this level of the schema.
+ Map<Integer, String> topLevelRenames = Maps.newHashMap();
+ // For nested schemas, collect all applicable renames here.
+ Multimap<Integer, RenamePair> nestedRenames = ArrayListMultimap.create();
+
+ for (RenamePair rename : renames) {
+ FieldAccessDescriptor access = rename.fieldAccessDescriptor;
+ if (!access.fieldIdsAccessed().isEmpty()) {
+ // This references a field at this level of the schema.
+ Integer fieldId = Iterables.getOnlyElement(access.fieldIdsAccessed());
+ topLevelRenames.put(fieldId, rename.newName);
+ } else {
+ // This references a nested field.
+ Map.Entry<Integer, FieldAccessDescriptor> nestedAccess =
+ Iterables.getOnlyElement(access.nestedFieldsById().entrySet());
+ nestedRenames.put(
+ nestedAccess.getKey(), new RenamePair(nestedAccess.getValue(), rename.newName));
+ }
+ }
+
+ Schema.Builder builder = Schema.builder();
+ for (int i = 0; i < inputSchema.getFieldCount(); ++i) {
+ Field field = inputSchema.getField(i);
+ FieldType fieldType = field.getType();
+ String newName = topLevelRenames.getOrDefault(i, field.getName());
+ Collection<RenamePair> nestedFieldRenames = nestedRenames.asMap().get(i);
+ if (nestedFieldRenames != null) {
+ // There are nested field renames. Recursively renameSchema the rest of the schema.
+ builder.addField(newName, renameFieldType(fieldType, nestedFieldRenames));
+ } else {
+ // No renameSchema for this field. Just add it back as is, potentially with a new name.
+ builder.addField(newName, fieldType);
+ }
+ }
+ return builder.build();
+ }
+
+ /** The class implementing the actual PTransform. */
+ public static class Inner<T> extends PTransform<PCollection<T>, PCollection<Row>> {
+ private List<RenamePair> renames;
+
+ private Inner() {
+ renames = Lists.newArrayList();
+ }
+
+ private Inner(List<RenamePair> renames) {
+ this.renames = renames;
+ }
+
+ /** Rename a specific field. */
+ public Inner<T> rename(String field, String newName) {
+ return rename(FieldAccessDescriptor.withFieldNames(field), newName);
+ }
+
+ /** Rename a specific field. */
+ public Inner<T> rename(FieldAccessDescriptor field, String newName) {
+ List<RenamePair> newList =
+ ImmutableList.<RenamePair>builder()
+ .addAll(renames)
+ .add(new RenamePair(field, newName))
+ .build();
+
+ return new Inner<>(newList);
+ }
+
+ @Override
+ public PCollection<Row> expand(PCollection<T> input) {
+ Schema inputSchema = input.getSchema();
+
+ List<RenamePair> pairs =
+ renames.stream().map(r -> r.resolve(inputSchema)).collect(Collectors.toList());
+ final Schema outputSchema = renameSchema(inputSchema, pairs);
+ return input
+ .apply(
+ ParDo.of(
+ new DoFn<T, Row>() {
+ @ProcessElement
+ public void processElement(@Element Row row, OutputReceiver<Row> o) {
+ o.output(Row.withSchema(outputSchema).attachValues(row.getValues()).build());
+ }
+ }))
+ .setRowSchema(outputSchema);
+ }
+ }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/AddFieldsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/AddFieldsTest.java
new file mode 100644
index 0000000..af166e2
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/AddFieldsTest.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import static junit.framework.TestCase.assertEquals;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+/** Tests for {@link AddFields}. */
+public class AddFieldsTest {
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void addSimpleFields() {
+ Schema schema = Schema.builder().addStringField("field1").build();
+ PCollection<Row> added =
+ pipeline
+ .apply(
+ Create.of(Row.withSchema(schema).addValue("value").build()).withRowSchema(schema))
+ .apply(
+ AddFields.<Row>create()
+ .field("field2", Schema.FieldType.INT32)
+ .field("field3", Schema.FieldType.array(Schema.FieldType.STRING)));
+
+ Schema expectedSchema =
+ Schema.builder()
+ .addStringField("field1")
+ .addNullableField("field2", Schema.FieldType.INT32)
+ .addNullableField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+ .build();
+ assertEquals(expectedSchema, added.getSchema());
+ Row expected = Row.withSchema(expectedSchema).addValues("value", null, null).build();
+ PAssert.that(added).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void addSimpleFieldsDefaultValue() {
+ Schema schema = Schema.builder().addStringField("field1").build();
+ PCollection<Row> added =
+ pipeline
+ .apply(
+ Create.of(Row.withSchema(schema).addValue("value").build()).withRowSchema(schema))
+ .apply(AddFields.<Row>create().field("field2", Schema.FieldType.INT32, 42));
+ Schema expectedSchema =
+ Schema.builder()
+ .addStringField("field1")
+ .addField("field2", Schema.FieldType.INT32)
+ .build();
+ assertEquals(expectedSchema, added.getSchema());
+ Row expected = Row.withSchema(expectedSchema).addValues("value", 42).build();
+ PAssert.that(added).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void addNestedField() {
+ Schema nested = Schema.builder().addStringField("field1").build();
+ Schema schema = Schema.builder().addRowField("nested", nested).build();
+
+ Row subRow = Row.withSchema(nested).addValue("value").build();
+ Row row = Row.withSchema(schema).addValue(subRow).build();
+ PCollection<Row> added =
+ pipeline
+ .apply(Create.of(row).withRowSchema(schema))
+ .apply(
+ AddFields.<Row>create()
+ .field("nested.field2", Schema.FieldType.INT32)
+ .field("nested.field3", Schema.FieldType.array(Schema.FieldType.STRING)));
+
+ Schema expectedNestedSchema =
+ Schema.builder()
+ .addStringField("field1")
+ .addNullableField("field2", Schema.FieldType.INT32)
+ .addNullableField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+ .build();
+ Schema expectedSchema = Schema.builder().addRowField("nested", expectedNestedSchema).build();
+ assertEquals(expectedSchema, added.getSchema());
+
+ Row expectedNested =
+ Row.withSchema(expectedNestedSchema).addValues("value", null, null).build();
+ Row expected = Row.withSchema(expectedSchema).addValue(expectedNested).build();
+
+ PAssert.that(added).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void addNestedFieldDefaultValue() {
+ Schema nested = Schema.builder().addStringField("field1").build();
+ Schema schema = Schema.builder().addRowField("nested", nested).build();
+
+ Row subRow = Row.withSchema(nested).addValue("value").build();
+ Row row = Row.withSchema(schema).addValue(subRow).build();
+ List<String> list = ImmutableList.of("one", "two", "three");
+ PCollection<Row> added =
+ pipeline
+ .apply(Create.of(row).withRowSchema(schema))
+ .apply(
+ AddFields.<Row>create()
+ .field("nested.field2", Schema.FieldType.INT32, 42)
+ .field("nested.field3", Schema.FieldType.array(Schema.FieldType.STRING), list));
+
+ Schema expectedNestedSchema =
+ Schema.builder()
+ .addStringField("field1")
+ .addField("field2", Schema.FieldType.INT32)
+ .addField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+ .build();
+ Schema expectedSchema = Schema.builder().addRowField("nested", expectedNestedSchema).build();
+ assertEquals(expectedSchema, added.getSchema());
+ Row expectedNested = Row.withSchema(expectedNestedSchema).addValues("value", 42, list).build();
+ Row expected = Row.withSchema(expectedSchema).addValue(expectedNested).build();
+
+ PAssert.that(added).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void addSimpleAndNestedField() {
+ Schema nested = Schema.builder().addStringField("field1").build();
+ Schema schema = Schema.builder().addRowField("nested", nested).build();
+
+ Row subRow = Row.withSchema(nested).addValue("value").build();
+ Row row = Row.withSchema(schema).addValue(subRow).build();
+ PCollection<Row> added =
+ pipeline
+ .apply(Create.of(row).withRowSchema(schema))
+ .apply(
+ AddFields.<Row>create()
+ .field("field2", Schema.FieldType.INT32)
+ .field("nested.field2", Schema.FieldType.INT32)
+ .field("nested.field3", Schema.FieldType.array(Schema.FieldType.STRING)));
+
+ Schema expectedNestedSchema =
+ Schema.builder()
+ .addStringField("field1")
+ .addNullableField("field2", Schema.FieldType.INT32)
+ .addNullableField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+ .build();
+ Schema expectedSchema =
+ Schema.builder()
+ .addRowField("nested", expectedNestedSchema)
+ .addNullableField("field2", Schema.FieldType.INT32)
+ .build();
+ assertEquals(expectedSchema, added.getSchema());
+
+ Row expectedNested =
+ Row.withSchema(expectedNestedSchema).addValues("value", null, null).build();
+ Row expected = Row.withSchema(expectedSchema).addValues(expectedNested, null).build();
+
+ PAssert.that(added).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void recursivelyAddNestedFields() {
+ Schema schema = Schema.of();
+
+ Row row = Row.withSchema(schema).build();
+ PCollection<Row> added =
+ pipeline
+ .apply(Create.of(row).withRowSchema(schema))
+ .apply(
+ AddFields.<Row>create()
+ .field("nested.field1", Schema.FieldType.STRING, "value")
+ .field("nested.field2", Schema.FieldType.INT32)
+ .field("nested.field3", Schema.FieldType.array(Schema.FieldType.STRING)));
+
+ Schema expectedNestedSchema =
+ Schema.builder()
+ .addStringField("field1")
+ .addNullableField("field2", Schema.FieldType.INT32)
+ .addNullableField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+ .build();
+ Schema expectedSchema =
+ Schema.builder()
+ .addNullableField("nested", Schema.FieldType.row(expectedNestedSchema))
+ .build();
+ assertEquals(expectedSchema, added.getSchema());
+
+ Row expectedNested =
+ Row.withSchema(expectedNestedSchema).addValues("value", null, null).build();
+ Row expected = Row.withSchema(expectedSchema).addValue(expectedNested).build();
+
+ PAssert.that(added).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void addNestedArrayField() {
+ Schema nested = Schema.builder().addStringField("field1").build();
+ Schema schema = Schema.builder().addArrayField("array", Schema.FieldType.row(nested)).build();
+
+ Row subRow = Row.withSchema(nested).addValue("value").build();
+ Row row = Row.withSchema(schema).addArray(subRow, subRow).build();
+ PCollection<Row> added =
+ pipeline
+ .apply(Create.of(row).withRowSchema(schema))
+ .apply(
+ AddFields.<Row>create()
+ .field("array.field2", Schema.FieldType.INT32)
+ .field("array.field3", Schema.FieldType.array(Schema.FieldType.STRING)));
+
+ Schema expectedNestedSchema =
+ Schema.builder()
+ .addStringField("field1")
+ .addNullableField("field2", Schema.FieldType.INT32)
+ .addNullableField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+ .build();
+ Schema expectedSchema =
+ Schema.builder().addArrayField("array", Schema.FieldType.row(expectedNestedSchema)).build();
+ assertEquals(expectedSchema, added.getSchema());
+
+ Row expectedNested =
+ Row.withSchema(expectedNestedSchema).addValues("value", null, null).build();
+ Row expected = Row.withSchema(expectedSchema).addArray(expectedNested, expectedNested).build();
+
+ PAssert.that(added).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void recursivelyAddNestedArrayField() {
+ Schema schema = Schema.builder().build();
+ Row row = Row.withSchema(schema).build();
+ PCollection<Row> added =
+ pipeline
+ .apply(Create.of(row).withRowSchema(schema))
+ .apply(
+ AddFields.<Row>create()
+ .field("array[].field1", FieldType.STRING)
+ .field("array[].field2", Schema.FieldType.INT32)
+ .field("array[].field3", Schema.FieldType.array(Schema.FieldType.STRING)));
+
+ Schema expectedNestedSchema =
+ Schema.builder()
+ .addNullableField("field1", FieldType.STRING)
+ .addNullableField("field2", Schema.FieldType.INT32)
+ .addNullableField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+ .build();
+ Schema expectedSchema =
+ Schema.builder()
+ .addNullableField(
+ "array",
+ Schema.FieldType.array(
+ Schema.FieldType.row(expectedNestedSchema).withNullable(true)))
+ .build();
+ assertEquals(expectedSchema, added.getSchema());
+
+ Row expected = Row.withSchema(expectedSchema).addValue(Collections.emptyList()).build();
+ PAssert.that(added).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void addNestedMapField() {
+ Schema nested = Schema.builder().addStringField("field1").build();
+ Schema schema =
+ Schema.builder()
+ .addMapField("map", Schema.FieldType.STRING, Schema.FieldType.row(nested))
+ .build();
+
+ Row subRow = Row.withSchema(nested).addValue("value").build();
+ Row row = Row.withSchema(schema).addValue(ImmutableMap.of("key", subRow)).build();
+ PCollection<Row> added =
+ pipeline
+ .apply(Create.of(row).withRowSchema(schema))
+ .apply(
+ AddFields.<Row>create()
+ .field("map.field2", Schema.FieldType.INT32)
+ .field("map.field3", Schema.FieldType.array(Schema.FieldType.STRING)));
+
+ Schema expectedNestedSchema =
+ Schema.builder()
+ .addStringField("field1")
+ .addNullableField("field2", Schema.FieldType.INT32)
+ .addNullableField("field3", Schema.FieldType.array(Schema.FieldType.STRING))
+ .build();
+ Schema expectedSchema =
+ Schema.builder()
+ .addMapField("map", Schema.FieldType.STRING, Schema.FieldType.row(expectedNestedSchema))
+ .build();
+ assertEquals(expectedSchema, added.getSchema());
+
+ Row expectedNested =
+ Row.withSchema(expectedNestedSchema).addValues("value", null, null).build();
+ Row expected =
+ Row.withSchema(expectedSchema).addValue(ImmutableMap.of("key", expectedNested)).build();
+
+ PAssert.that(added).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void addDuplicateField() {
+ Schema schema = Schema.builder().addStringField("field1").build();
+ thrown.expect(IllegalArgumentException.class);
+ pipeline
+ .apply(Create.of(Row.withSchema(schema).addValue("value").build()).withRowSchema(schema))
+ .apply(AddFields.<Row>create().field("field1", Schema.FieldType.INT32));
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void addNonNullableField() {
+ Schema schema = Schema.builder().addStringField("field1").build();
+ thrown.expect(IllegalArgumentException.class);
+ pipeline
+ .apply(Create.of(Row.withSchema(schema).addValue("value").build()).withRowSchema(schema))
+ .apply(AddFields.<Row>create().field("field2", Schema.FieldType.INT32, null));
+ pipeline.run();
+ }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/DropFieldsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/DropFieldsTest.java
new file mode 100644
index 0000000..52c67a4
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/DropFieldsTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/** Tests for {@link DropFields}. */
+public class DropFieldsTest {
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ private static final Schema SIMPLE_SCHEMA =
+ Schema.builder().addInt32Field("field1").addStringField("field2").build();
+
+ private static Row simpleRow(int field1, String field2) {
+ return Row.withSchema(SIMPLE_SCHEMA).addValues(field1, field2).build();
+ }
+
+ private static final Schema NESTED_SCHEMA =
+ Schema.builder().addRowField("nested", SIMPLE_SCHEMA).addStringField("string").build();
+
+ private static Row nestedRow(Row nested) {
+ return Row.withSchema(NESTED_SCHEMA).addValues(nested, "foo").build();
+ }
+
+ private static final Schema NESTED_ARRAY_SCHEMA =
+ Schema.builder().addArrayField("array", FieldType.row(SIMPLE_SCHEMA)).build();
+
+ private static Row nestedArray(Row... elements) {
+ return Row.withSchema(NESTED_ARRAY_SCHEMA).addArray((Object[]) elements).build();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testDropTopLevelField() {
+ Schema expectedSchema = Schema.builder().addStringField("field2").build();
+
+ PCollection<Row> result =
+ pipeline
+ .apply(
+ Create.of(simpleRow(1, "one"), simpleRow(2, "two"), simpleRow(3, "three"))
+ .withRowSchema(SIMPLE_SCHEMA))
+ .apply(DropFields.fields("field1"));
+ assertEquals(expectedSchema, result.getSchema());
+
+ List<Row> expectedRows =
+ Lists.newArrayList(
+ Row.withSchema(expectedSchema).addValue("one").build(),
+ Row.withSchema(expectedSchema).addValue("two").build(),
+ Row.withSchema(expectedSchema).addValue("three").build());
+ PAssert.that(result).containsInAnyOrder(expectedRows);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testDropNestedField() {
+ Schema expectedSchema =
+ Schema.builder().addStringField("string").addStringField("field2").build();
+
+ PCollection<Row> result =
+ pipeline
+ .apply(
+ Create.of(
+ nestedRow(simpleRow(1, "one")),
+ nestedRow(simpleRow(2, "two")),
+ nestedRow(simpleRow(3, "three")))
+ .withRowSchema(NESTED_SCHEMA))
+ .apply(DropFields.fields("nested.field1"));
+ assertEquals(expectedSchema, result.getSchema());
+
+ List<Row> expectedRows =
+ Lists.newArrayList(
+ Row.withSchema(expectedSchema).addValues("foo", "one").build(),
+ Row.withSchema(expectedSchema).addValues("foo", "two").build(),
+ Row.withSchema(expectedSchema).addValues("foo", "three").build());
+
+ PAssert.that(result).containsInAnyOrder(expectedRows);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testDropNestedFieldKeepingOnlyNested() {
+ Schema expectedSchema = Schema.builder().addStringField("field2").build();
+
+ PCollection<Row> result =
+ pipeline
+ .apply(
+ Create.of(
+ nestedRow(simpleRow(1, "one")),
+ nestedRow(simpleRow(2, "two")),
+ nestedRow(simpleRow(3, "three")))
+ .withRowSchema(NESTED_SCHEMA))
+ .apply(DropFields.fields("string", "nested.field1"));
+ assertEquals(expectedSchema, result.getSchema());
+
+ List<Row> expectedRows =
+ Lists.newArrayList(
+ Row.withSchema(expectedSchema).addValue("one").build(),
+ Row.withSchema(expectedSchema).addValue("two").build(),
+ Row.withSchema(expectedSchema).addValue("three").build());
+ PAssert.that(result).containsInAnyOrder(expectedRows);
+ pipeline.run();
+ }
+
+ // drop making sure a nested field remains.
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testDropNestedArrayField() {
+ Schema expectedSchema = Schema.builder().addArrayField("field2", FieldType.STRING).build();
+
+ PCollection<Row> result =
+ pipeline
+ .apply(
+ Create.of(
+ nestedArray(simpleRow(1, "one1"), simpleRow(1, "one2")),
+ nestedArray(simpleRow(2, "two1"), simpleRow(2, "two2")),
+ nestedArray(simpleRow(3, "three1"), simpleRow(3, "three2")))
+ .withRowSchema(NESTED_ARRAY_SCHEMA))
+ .apply(DropFields.fields("array[].field1"));
+ assertEquals(expectedSchema, result.getSchema());
+
+ List<Row> expectedRows =
+ Lists.newArrayList(
+ Row.withSchema(expectedSchema).addArray("one1", "one2").build(),
+ Row.withSchema(expectedSchema).addArray("two1", "two2").build(),
+ Row.withSchema(expectedSchema).addArray("three1", "three2").build());
+ PAssert.that(result).containsInAnyOrder(expectedRows);
+ pipeline.run();
+ }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
new file mode 100644
index 0000000..b940e78
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/RenameFieldsTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.transforms;
+
+import static junit.framework.TestCase.assertEquals;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/** Tests for {@link RenameFields}. */
+public class RenameFieldsTest {
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void renameTopLevelFields() {
+ Schema schema = Schema.builder().addStringField("field1").addInt32Field("field2").build();
+ PCollection<Row> renamed =
+ pipeline
+ .apply(
+ Create.of(
+ Row.withSchema(schema).addValues("one", 1).build(),
+ Row.withSchema(schema).addValues("two", 2).build())
+ .withRowSchema(schema))
+ .apply(RenameFields.<Row>create().rename("field1", "new1").rename("field2", "new2"));
+ Schema expectedSchema = Schema.builder().addStringField("new1").addInt32Field("new2").build();
+ assertEquals(expectedSchema, renamed.getSchema());
+ List<Row> expectedRows =
+ ImmutableList.of(
+ Row.withSchema(expectedSchema).addValues("one", 1).build(),
+ Row.withSchema(expectedSchema).addValues("two", 2).build());
+ PAssert.that(renamed).containsInAnyOrder(expectedRows);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void renameNestedFields() {
+ Schema nestedSchema = Schema.builder().addStringField("field1").addInt32Field("field2").build();
+ Schema schema =
+ Schema.builder().addStringField("field1").addRowField("nested", nestedSchema).build();
+
+ PCollection<Row> renamed =
+ pipeline
+ .apply(
+ Create.of(
+ Row.withSchema(schema)
+ .addValues(
+ "one", Row.withSchema(nestedSchema).addValues("one", 1).build())
+ .build(),
+ Row.withSchema(schema)
+ .addValues(
+ "two", Row.withSchema(nestedSchema).addValues("two", 1).build())
+ .build())
+ .withRowSchema(schema))
+ .apply(
+ RenameFields.<Row>create()
+ .rename("nested.field1", "new1")
+ .rename("nested.field2", "new2"));
+
+ Schema expectedNestedSchema =
+ Schema.builder().addStringField("new1").addInt32Field("new2").build();
+ Schema expectedSchema =
+ Schema.builder()
+ .addStringField("field1")
+ .addRowField("nested", expectedNestedSchema)
+ .build();
+ assertEquals(expectedSchema, renamed.getSchema());
+
+ List<Row> expectedRows =
+ ImmutableList.of(
+ Row.withSchema(expectedSchema)
+ .addValues("one", Row.withSchema(expectedNestedSchema).addValues("one", 1).build())
+ .build(),
+ Row.withSchema(expectedSchema)
+ .addValues("two", Row.withSchema(expectedNestedSchema).addValues("two", 1).build())
+ .build());
+
+ PAssert.that(renamed).containsInAnyOrder(expectedRows);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void renameTopLevelAndNestedFields() {
+ Schema nestedSchema = Schema.builder().addStringField("field1").addInt32Field("field2").build();
+ Schema schema =
+ Schema.builder().addStringField("field1").addRowField("nested", nestedSchema).build();
+
+ PCollection<Row> renamed =
+ pipeline
+ .apply(
+ Create.of(
+ Row.withSchema(schema)
+ .addValues(
+ "one", Row.withSchema(nestedSchema).addValues("one", 1).build())
+ .build(),
+ Row.withSchema(schema)
+ .addValues(
+ "two", Row.withSchema(nestedSchema).addValues("two", 1).build())
+ .build())
+ .withRowSchema(schema))
+ .apply(
+ RenameFields.<Row>create()
+ .rename("field1", "top1")
+ .rename("nested", "newnested")
+ .rename("nested.field1", "new1")
+ .rename("nested.field2", "new2"));
+
+ Schema expectedNestedSchema =
+ Schema.builder().addStringField("new1").addInt32Field("new2").build();
+ Schema expectedSchema =
+ Schema.builder()
+ .addStringField("top1")
+ .addRowField("newnested", expectedNestedSchema)
+ .build();
+ assertEquals(expectedSchema, renamed.getSchema());
+
+ List<Row> expectedRows =
+ ImmutableList.of(
+ Row.withSchema(expectedSchema)
+ .addValues("one", Row.withSchema(expectedNestedSchema).addValues("one", 1).build())
+ .build(),
+ Row.withSchema(expectedSchema)
+ .addValues("two", Row.withSchema(expectedNestedSchema).addValues("two", 1).build())
+ .build());
+
+ PAssert.that(renamed).containsInAnyOrder(expectedRows);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void renameNestedInArrayFields() {
+ Schema nestedSchema = Schema.builder().addStringField("field1").addInt32Field("field2").build();
+ Schema schema =
+ Schema.builder().addArrayField("array", Schema.FieldType.row(nestedSchema)).build();
+
+ PCollection<Row> renamed =
+ pipeline
+ .apply(
+ Create.of(
+ Row.withSchema(schema)
+ .addValue(
+ ImmutableList.of(
+ Row.withSchema(nestedSchema).addValues("one", 1).build()))
+ .build(),
+ Row.withSchema(schema)
+ .addValue(
+ ImmutableList.of(
+ Row.withSchema(nestedSchema).addValues("two", 1).build()))
+ .build())
+ .withRowSchema(schema))
+ .apply(
+ RenameFields.<Row>create()
+ .rename("array.field1", "new1")
+ .rename("array.field2", "new2"));
+
+ Schema expectedNestedSchema =
+ Schema.builder().addStringField("new1").addInt32Field("new2").build();
+ Schema expectedSchema =
+ Schema.builder().addArrayField("array", Schema.FieldType.row(expectedNestedSchema)).build();
+ assertEquals(expectedSchema, renamed.getSchema());
+
+ List<Row> expectedRows =
+ ImmutableList.of(
+ Row.withSchema(expectedSchema)
+ .addValue(
+ ImmutableList.of(
+ Row.withSchema(expectedNestedSchema).addValues("one", 1).build()))
+ .build(),
+ Row.withSchema(expectedSchema)
+ .addValue(
+ ImmutableList.of(
+ Row.withSchema(expectedNestedSchema).addValues("two", 1).build()))
+ .build());
+
+ PAssert.that(renamed).containsInAnyOrder(expectedRows);
+ pipeline.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void renameNestedInMapFields() {
+ Schema nestedSchema = Schema.builder().addStringField("field1").addInt32Field("field2").build();
+ Schema schema =
+ Schema.builder()
+ .addMapField("map", Schema.FieldType.STRING, Schema.FieldType.row(nestedSchema))
+ .build();
+
+ PCollection<Row> renamed =
+ pipeline
+ .apply(
+ Create.of(
+ Row.withSchema(schema)
+ .addValue(
+ ImmutableMap.of(
+ "k1", Row.withSchema(nestedSchema).addValues("one", 1).build()))
+ .build(),
+ Row.withSchema(schema)
+ .addValue(
+ ImmutableMap.of(
+ "k2", Row.withSchema(nestedSchema).addValues("two", 1).build()))
+ .build())
+ .withRowSchema(schema))
+ .apply(
+ RenameFields.<Row>create()
+ .rename("map.field1", "new1")
+ .rename("map.field2", "new2"));
+
+ Schema expectedNestedSchema =
+ Schema.builder().addStringField("new1").addInt32Field("new2").build();
+ Schema expectedSchema =
+ Schema.builder()
+ .addMapField("map", Schema.FieldType.STRING, Schema.FieldType.row(expectedNestedSchema))
+ .build();
+ assertEquals(expectedSchema, renamed.getSchema());
+
+ List<Row> expectedRows =
+ ImmutableList.of(
+ Row.withSchema(expectedSchema)
+ .addValue(
+ ImmutableMap.of(
+ "k1", Row.withSchema(expectedNestedSchema).addValues("one", 1).build()))
+ .build(),
+ Row.withSchema(expectedSchema)
+ .addValue(
+ ImmutableMap.of(
+ "k2", Row.withSchema(expectedNestedSchema).addValues("two", 1).build()))
+ .build());
+
+ PAssert.that(renamed).containsInAnyOrder(expectedRows);
+ pipeline.run();
+ }
+}