You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "ahmedabu98 (via GitHub)" <gi...@apache.org> on 2023/03/22 12:44:34 UTC

[GitHub] [beam] ahmedabu98 opened a new pull request, #25927: FileReadSchemaTransform

ahmedabu98 opened a new pull request, #25927:
URL: https://github.com/apache/beam/pull/25927

   Implement a SchemaTransform for file reads. Will support formats: Avro, JSON, Parquet, and reading individual lines.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1170393253


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java:
##########
@@ -128,50 +134,151 @@ public String apply(Row input) {
     };
   }
 
+  public static String jsonSchemaStringFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaFromBeamSchema(beamSchema).toString();
+  }
+
+  public static ObjectSchema jsonSchemaFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaBuilderFromBeamSchema(beamSchema).build();
+  }
+
+  private static ObjectSchema.Builder jsonSchemaBuilderFromBeamSchema(Schema beamSchema) {
+    // Beam Schema is strict, so we should not accept additional properties
+    ObjectSchema.Builder jsonSchemaBuilder = ObjectSchema.builder().additionalProperties(false);
+
+    for (Field field : beamSchema.getFields()) {
+      String name = field.getName();
+      org.everit.json.schema.Schema propertySchema = jsonPropertyFromBeamType(field.getType());
+
+      // Add property and make it required
+      jsonSchemaBuilder = jsonSchemaBuilder.addPropertySchema(name, propertySchema);
+      jsonSchemaBuilder.addRequiredProperty(name);
+    }
+
+    return jsonSchemaBuilder;
+  }
+
+  private static org.everit.json.schema.Schema jsonPropertyFromBeamType(FieldType beamType) {
+    org.everit.json.schema.Schema.Builder<? extends org.everit.json.schema.Schema> propertySchema;
+
+    switch (beamType.getTypeName()) {
+      case BYTE:
+      case INT16:
+      case INT32:
+      case INT64:
+        propertySchema = NumberSchema.builder().requiresInteger(true);

Review Comment:
   Hmmm I'm not sure I understand. This library is for building json schemas, so it wouldn't be parsing any values. As far as these lines go, we're only concerned with whether the number is an integer or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1170339956


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class FileReadSchemaTransformConfiguration {
+  static final Set<String> VALID_FORMATS = Sets.newHashSet("avro", "parquet", "json", "line");
+
+  public void validate() {
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getFormat()) && VALID_FORMATS.contains(this.getFormat()),
+        "A valid file format must be specified. Please specify one of: "
+            + VALID_FORMATS.toString());
+
+    if (!this.getFormat().equals("line")) {
+      checkArgument(
+          !Strings.isNullOrEmpty(this.getSchema()),
+          String.format(
+              "A schema must be specified when reading files with %s format. You may provide a schema string or a path to a file containing the schema.",
+              this.getFormat()));
+    }

Review Comment:
   Thanks for the analysis Damon! How would it look like for Python to send a Schema object? I'd prefer using a String for now to keep the cross-language use-case simple. 
   
   > I'm struggling with the need for someone to look into this code to determine that schema is required for all of the formats except for line
   
   Would it help to improve the error message? I can include all the formats that do require a schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas merged pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas merged PR #25927:
URL: https://github.com/apache/beam/pull/25927


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on pull request #25927: FileReadSchemaTransform

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on PR #25927:
URL: https://github.com/apache/beam/pull/25927#issuecomment-1505446891

   run java precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25927:
URL: https://github.com/apache/beam/pull/25927#issuecomment-1492231132

   R: @damondouglas 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25927:
URL: https://github.com/apache/beam/pull/25927#issuecomment-1505605928

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1149345996


##########
sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java:
##########
@@ -270,6 +400,55 @@ public static SerializableFunction<Row, DoublyNestedDataTypes> doublyNestedDataT
     return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
   }
 
+  /**
+   * Contains all primitive Java types supported by Avro. The purpose of this class is to test
+   * schema-aware PTransforms with flat {@link Schema} {@link Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class AvroPrimitiveDataTypes implements Serializable {

Review Comment:
   Thinking of proceeding by taking not including any form of bytes in AllPrimitiveDataTypes since there is no common overlap. Instead, we can have two new classes, ByteType and ByteSequenceType that will test for byte, byte[]. I can create new tests for these in both file read and write transforms.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1158922274


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java:
##########
@@ -128,50 +133,154 @@ public String apply(Row input) {
     };
   }
 
+  public static String jsonSchemaStringFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaFromBeamSchema(beamSchema).toString();
+  }
+
+  public static ObjectSchema jsonSchemaFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaBuilderFromBeamSchema(beamSchema).build();
+  }
+
+  private static ObjectSchema.Builder jsonSchemaBuilderFromBeamSchema(Schema beamSchema) {
+    // Beam Schema is strict, so we should not accept additional properties
+    ObjectSchema.Builder jsonSchemaBuilder = ObjectSchema.builder().additionalProperties(false);
+
+    for (Field field : beamSchema.getFields()) {
+      String name = field.getName();
+      org.everit.json.schema.Schema propertySchema = jsonPropertyFromBeamType(field.getType());
+
+      // Add property and make it required
+      jsonSchemaBuilder = jsonSchemaBuilder.addPropertySchema(name, propertySchema);
+      jsonSchemaBuilder.addRequiredProperty(name);
+    }
+
+    return jsonSchemaBuilder;
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+  })
+  private static org.everit.json.schema.Schema jsonPropertyFromBeamType(FieldType beamType) {
+    org.everit.json.schema.Schema.Builder<? extends org.everit.json.schema.Schema> propertySchema;
+
+    switch (beamType.getTypeName()) {
+      case BYTE:
+      case INT16:
+      case INT32:
+      case INT64:
+        propertySchema = NumberSchema.builder().requiresInteger(true);
+        break;
+      case DECIMAL:
+      case FLOAT:
+      case DOUBLE:
+        propertySchema = NumberSchema.builder();
+        break;
+      case STRING:
+        propertySchema = StringSchema.builder();
+        break;
+      case BOOLEAN:
+        propertySchema = BooleanSchema.builder();
+        break;
+      case ARRAY:
+      case ITERABLE:
+        propertySchema =
+            ArraySchema.builder()
+                .allItemSchema(jsonPropertyFromBeamType(beamType.getCollectionElementType()));
+        break;
+      case ROW:
+        propertySchema = jsonSchemaBuilderFromBeamSchema(beamType.getRowSchema());
+        break;
+
+        // add more Beam to JSON types
+      default:
+        throw new IllegalArgumentException("Unsupported Beam to JSON type: " + beamType);
+    }
+
+    if (beamType.getNullable()) {
+      propertySchema = propertySchema.nullable(true);
+    }
+
+    return propertySchema.build();
+  }
+
   public static Schema beamSchemaFromJsonSchema(String jsonSchemaStr) {
     org.everit.json.schema.ObjectSchema jsonSchema = jsonSchemaFromString(jsonSchemaStr);
     return beamSchemaFromJsonSchema(jsonSchema);
   }
 
   private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSchema jsonSchema) {
     Schema.Builder beamSchemaBuilder = Schema.builder();
-    for (Map.Entry<String, org.everit.json.schema.Schema> entry :
-        jsonSchema.getPropertySchemas().entrySet()) {
+    Map<String, org.everit.json.schema.Schema> properties =
+        new HashMap<>(jsonSchema.getPropertySchemas());
+    // Properties in a JSON Schema are stored in a Map object and unfortunately don't maintain
+    // order. However, the schema's required properties is a list of property names that is
+    // consistent and is in the same order as when the schema was first created. To create a
+    // consistent Beam Schema from the same JSON schema, we add Schema Fields following this order.
+    // We can guarantee a consistent Beam schema when all JSON properties are required.
+    for (String propertyName : jsonSchema.getRequiredProperties()) {
+      org.everit.json.schema.Schema propertySchema = properties.get(propertyName);
+      if (propertySchema == null) {
+        throw new IllegalArgumentException("Unable to parse schema " + jsonSchema);
+      }
+
+      Boolean isNullable =
+          Boolean.TRUE.equals(propertySchema.getUnprocessedProperties().get("nullable"));
+      beamSchemaBuilder =
+          addPropertySchemaToBeamSchema(
+              propertyName, propertySchema, beamSchemaBuilder, isNullable);
+      // Remove properties we already added.
+      properties.remove(propertyName, propertySchema);
+    }
+
+    // Now we are potentially left with properties that are not required. Add them too.
+    // Note: having more than one non-required properties may result in  inconsistent
+    // Beam schema field orderings.

Review Comment:
   properties in a json schema do not maintain ordering. When we convert a Beam schema to a json schema, we add the fields to the required properties in order. So when converting from json schema back to Beam schema, the required properties list can be used to keep ordering the same. As for the additional (non-required) properties, we won't be able to guarantee the Beam schema order. 
   
   Unlike json schemas, Beam schemas care about order. So one json schema with non-required properties can output more than one beam schema, and for Beam standards they would be equivalent but not equal. This makes a best effort attempt to make a given Beam schema survive a roundtrip of converting to json schema and back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1166137111


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class FileReadSchemaTransformConfiguration {
+  static final Set<String> VALID_FORMATS = Sets.newHashSet("avro", "parquet", "json", "line");
+
+  public void validate() {
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getFormat()) && VALID_FORMATS.contains(this.getFormat()),
+        "A valid file format must be specified. Please specify one of: "
+            + VALID_FORMATS.toString());

Review Comment:
   Do you like the idea of this instead?  That was if a new FileReadSchemaTransformFormatProvider AutoService annotated implementation is added there's no need to syncronize with the VALID_FORMATS list.
   ```
   Providers.loadProviders(FileReadSchemaTransformFormatProvider.class).containsKey(this.getFormat())
   ```



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class FileReadSchemaTransformConfiguration {
+  static final Set<String> VALID_FORMATS = Sets.newHashSet("avro", "parquet", "json", "line");
+
+  public void validate() {
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getFormat()) && VALID_FORMATS.contains(this.getFormat()),
+        "A valid file format must be specified. Please specify one of: "
+            + VALID_FORMATS.toString());
+
+    if (!this.getFormat().equals("line")) {
+      checkArgument(
+          !Strings.isNullOrEmpty(this.getSchema()),
+          String.format(
+              "A schema must be specified when reading files with %s format. You may provide a schema string or a path to a file containing the schema.",
+              this.getFormat()));
+    }

Review Comment:
   I'm struggling with the need for someone to look into this code to determine that schema is required for all of the formats except for line.  Neither of the following options are 100% ideal and I trust your judgement but wanted to relay my thoughts.
   
   1) Remove LineReadSchemaTransformFormatProvider and the Nullable annotation on the getSchema method
   2) Remove the Nullable annotation on the getSchema method and provide/document a Schema for LineReadSchemaTransformFormatProvider
   
   What's missing from option 1 and 2 is dealing with a non-null empty string. I'd have to test this on my end but I wonder if Schema is one of the rare non-primitive types that we can get away with.



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})

Review Comment:
   Is there a way to apply nullness warnings suppression at the methods that are giving the problem? It's more code but alternatively do you like the idea of using Optional?  In the configuration I don't imagine a performance hit as it is evaluated at pipeline construction time.



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern parameter must be set,"
+              + "but not both.");

Review Comment:
   I had a hard time understanding this. Is this because we have the LineReadSchemaTransformFormatProvider?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class FileReadSchemaTransformConfiguration {
+  static final Set<String> VALID_FORMATS = Sets.newHashSet("avro", "parquet", "json", "line");
+
+  public void validate() {
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getFormat()) && VALID_FORMATS.contains(this.getFormat()),
+        "A valid file format must be specified. Please specify one of: "
+            + VALID_FORMATS.toString());
+
+    if (!this.getFormat().equals("line")) {
+      checkArgument(
+          !Strings.isNullOrEmpty(this.getSchema()),
+          String.format(
+              "A schema must be specified when reading files with %s format. You may provide a schema string or a path to a file containing the schema.",
+              this.getFormat()));
+    }
+
+    Integer terminateAfterSecondsSinceNewOutput = this.getTerminateAfterSecondsSinceNewOutput();
+    Integer pollIntervalMillis = this.getPollIntervalMillis();
+    if (terminateAfterSecondsSinceNewOutput != null && terminateAfterSecondsSinceNewOutput > 0) {
+      checkArgument(
+          pollIntervalMillis != null && pollIntervalMillis > 0,
+          "Found positive value for terminateAfterSecondsSinceNewOutput but non-positive"
+              + "value for pollIntervalMillis. Please set pollIntervalMillis as well to enable"
+              + "streaming.");
+    }
+  }
+
+  public static Builder builder() {
+    return new AutoValue_FileReadSchemaTransformConfiguration.Builder();
+  }
+
+  /**
+   * The format of the file(s) to read.
+   *
+   * <p>Possible values are: `"lines"`, `"avro"`, `"parquet"`, `"json"`
+   */
+  public abstract String getFormat();
+
+  /**
+   * The filepattern used to match and read files.
+   *
+   * <p>May instead use an input PCollection<Row> of filepatterns.
+   */
+  @Nullable
+  public abstract String getFilepattern();
+
+  /**
+   * The schema used by sources to deserialize data and create Beam Rows.
+   *
+   * <p>May be provided as a schema String or as a String path to a file that contains the schema.
+   */
+  @Nullable
+  public abstract String getSchema();
+
+  /**
+   * The time, in milliseconds, to wait before polling for new files.
+   *
+   * <p>This will set the pipeline to be a streaming pipeline and will continuously watch for new
+   * files.
+   *
+   * <p>Note: This only polls for new files. New updates to an existing file will not be watched
+   * for.
+   */
+  @Nullable
+  public abstract Integer getPollIntervalMillis();

Review Comment:
   Should this be a Long so that it is easy to pass into [Duration](https://www.joda.org/joda-time/apidocs/org/joda/time/Duration.html#Duration-long-)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1167333670


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java:
##########
@@ -128,50 +134,151 @@ public String apply(Row input) {
     };
   }
 
+  public static String jsonSchemaStringFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaFromBeamSchema(beamSchema).toString();
+  }
+
+  public static ObjectSchema jsonSchemaFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaBuilderFromBeamSchema(beamSchema).build();
+  }
+
+  private static ObjectSchema.Builder jsonSchemaBuilderFromBeamSchema(Schema beamSchema) {
+    // Beam Schema is strict, so we should not accept additional properties
+    ObjectSchema.Builder jsonSchemaBuilder = ObjectSchema.builder().additionalProperties(false);
+
+    for (Field field : beamSchema.getFields()) {
+      String name = field.getName();
+      org.everit.json.schema.Schema propertySchema = jsonPropertyFromBeamType(field.getType());
+
+      // Add property and make it required
+      jsonSchemaBuilder = jsonSchemaBuilder.addPropertySchema(name, propertySchema);
+      jsonSchemaBuilder.addRequiredProperty(name);
+    }
+
+    return jsonSchemaBuilder;
+  }
+
+  private static org.everit.json.schema.Schema jsonPropertyFromBeamType(FieldType beamType) {
+    org.everit.json.schema.Schema.Builder<? extends org.everit.json.schema.Schema> propertySchema;
+
+    switch (beamType.getTypeName()) {
+      case BYTE:
+      case INT16:
+      case INT32:
+      case INT64:
+        propertySchema = NumberSchema.builder().requiresInteger(true);

Review Comment:
   Does everit correctly parse the JSON integer value to the Java type either INT32 or INT64?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1149210074


##########
sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java:
##########
@@ -270,6 +400,55 @@ public static SerializableFunction<Row, DoublyNestedDataTypes> doublyNestedDataT
     return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
   }
 
+  /**
+   * Contains all primitive Java types supported by Avro. The purpose of this class is to test
+   * schema-aware PTransforms with flat {@link Schema} {@link Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class AvroPrimitiveDataTypes implements Serializable {

Review Comment:
   Tried this, looks like XML, JSON, CSV don't support bytes (as in a sequence of bytes), but do support a single byte type. Opposite to the case of Avro and Parquet, which support a byte sequence but not a single byte.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25927:
URL: https://github.com/apache/beam/pull/25927#issuecomment-1485741929

   Run Java_File-schema-transform_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1149489441


##########
sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java:
##########
@@ -270,6 +400,55 @@ public static SerializableFunction<Row, DoublyNestedDataTypes> doublyNestedDataT
     return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
   }
 
+  /**
+   * Contains all primitive Java types supported by Avro. The purpose of this class is to test
+   * schema-aware PTransforms with flat {@link Schema} {@link Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class AvroPrimitiveDataTypes implements Serializable {

Review Comment:
   I like your idea about removing the troublesome byte types out.  Let me know if you need help refactoring tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1174144957


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProvider.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.extensions.avro.io.AvroIO;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(FileReadSchemaTransformFormatProvider.class)
+public class AvroReadSchemaTransformFormatProvider
+    implements FileReadSchemaTransformFormatProvider {
+  @Override
+  public String identifier() {
+    return "avro";
+  }
+
+  @Override
+  public PTransform<PCollection<ReadableFile>, PCollection<Row>> buildTransform(
+      FileReadSchemaTransformConfiguration configuration) {
+
+    return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() {
+      @Override
+      public PCollection<Row> expand(PCollection<ReadableFile> input) {
+        Schema beamSchema =
+            AvroUtils.toBeamSchema(
+                new org.apache.avro.Schema.Parser().parse(configuration.getSchema()));

Review Comment:
   Consider `configuration.getSafeSchema()` from [comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095).



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProvider.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.parquet.ParquetIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(FileReadSchemaTransformFormatProvider.class)
+public class ParquetReadSchemaTransformFormatProvider
+    implements FileReadSchemaTransformFormatProvider {
+  @Override
+  public String identifier() {
+    return "parquet";
+  }
+
+  @Override
+  public PTransform<PCollection<ReadableFile>, PCollection<Row>> buildTransform(
+      FileReadSchemaTransformConfiguration configuration) {
+
+    return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() {
+      @Override
+      public PCollection<Row> expand(PCollection<ReadableFile> input) {
+        org.apache.avro.Schema avroSchema =
+            new org.apache.avro.Schema.Parser().parse(configuration.getSchema());

Review Comment:
   In the context of this [comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095) may we consider `configuration.getSafeSchema()`?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern parameter must be set,"
+              + "but not both.");
+
+      // Input schema can be a schema String or a path to a file containing the schema
+      // Resolve to get a schema String
+      String schema = configuration.getSchema();
+      if (!Strings.isNullOrEmpty(schema)) {
+        schema = resolveSchemaStringOrFilePath(schema);
+        configuration = configuration.toBuilder().setSchema(schema).build();
+      }
+
+      PCollection<MatchResult.Metadata> files;
+      if (!Strings.isNullOrEmpty(configuration.getFilepattern())) {
+        Pipeline p = input.getPipeline();
+        FileIO.Match matchFiles = FileIO.match().filepattern(configuration.getFilepattern());
+        // Handle streaming case
+        matchFiles = (FileIO.Match) maybeApplyStreaming(matchFiles);
+
+        files = p.apply(matchFiles);
+      } else {
+        FileIO.MatchAll matchAllFiles = FileIO.matchAll();
+        // Handle streaming case
+        matchAllFiles = (FileIO.MatchAll) maybeApplyStreaming(matchAllFiles);
+
+        files =
+            input
+                .get(INPUT_TAG)
+                .apply(
+                    "Get filepatterns",
+                    MapElements.into(TypeDescriptors.strings())
+                        .via((row) -> row.getString("filepattern")))
+                .apply("Match files", matchAllFiles);
+      }
+
+      // Pass readable files to the appropriate source and output rows.
+      PCollection<Row> output =
+          files
+              .apply(FileIO.readMatches())
+              .apply("Read files", getProvider().buildTransform(configuration));
+
+      return PCollectionRowTuple.of(OUTPUT_TAG, output);
+    }
+
+    public PTransform<?, PCollection<MatchResult.Metadata>> maybeApplyStreaming(
+        PTransform<?, PCollection<MatchResult.Metadata>> matchTransform) {
+      // Two parameters are provided to configure watching for new files.
+      Long terminateAfterSeconds = configuration.getTerminateAfterSecondsSinceNewOutput();
+      Long pollIntervalMillis = configuration.getPollIntervalMillis();
+
+      // Streaming is enabled when a poll interval is provided
+      if (pollIntervalMillis != null && pollIntervalMillis > 0L) {
+        Duration pollDuration = Duration.millis(pollIntervalMillis);
+
+        // By default, the file match transform never terminates
+        TerminationCondition<String, ?> terminationCondition = Growth.never();
+
+        // If provided, will terminate after this many seconds since seeing a new file
+        if (terminateAfterSeconds != null && terminateAfterSeconds > 0L) {
+          terminationCondition =
+              Growth.afterTimeSinceNewOutput(Duration.standardSeconds(terminateAfterSeconds));
+        }
+
+        // Apply watch for new files
+        if (matchTransform instanceof FileIO.Match) {
+          matchTransform =
+              ((FileIO.Match) matchTransform).continuously(pollDuration, terminationCondition);
+        } else if (matchTransform instanceof FileIO.MatchAll) {
+          matchTransform =
+              ((FileIO.MatchAll) matchTransform).continuously(pollDuration, terminationCondition);
+        }
+      }
+      return matchTransform;
+    }
+
+    public String resolveSchemaStringOrFilePath(String schema) {

Review Comment:
   Instead of this method, may we consider an additional configuration field and throwing an error when the user provides both the Schema and the Schema file source property?  It took me some time to realize that the configuration's "Schema" property could be either a Schema or a file path.  Having a more declared intent by the user from where the Schema derives would simplify the code.



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProvider.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.extensions.avro.io.AvroIO;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(FileReadSchemaTransformFormatProvider.class)
+public class AvroReadSchemaTransformFormatProvider
+    implements FileReadSchemaTransformFormatProvider {
+  @Override
+  public String identifier() {
+    return "avro";
+  }
+
+  @Override
+  public PTransform<PCollection<ReadableFile>, PCollection<Row>> buildTransform(
+      FileReadSchemaTransformConfiguration configuration) {
+
+    return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() {
+      @Override
+      public PCollection<Row> expand(PCollection<ReadableFile> input) {
+        Schema beamSchema =
+            AvroUtils.toBeamSchema(
+                new org.apache.avro.Schema.Parser().parse(configuration.getSchema()));
+
+        return input
+            .apply(AvroIO.readFilesGenericRecords(configuration.getSchema()).withBeamSchemas(true))

Review Comment:
   Consider `configuration.getSafeSchema()` from [comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095).



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProvider.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import com.google.auto.service.AutoService;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.schemas.utils.JsonUtils;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/** A {@link FileReadSchemaTransformFormatProvider} that reads newline-delimited JSONs. */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(FileReadSchemaTransformFormatProvider.class)
+public class JsonReadSchemaTransformFormatProvider
+    implements FileReadSchemaTransformFormatProvider {
+  @Override
+  public String identifier() {
+    return "json";
+  }
+
+  @Override
+  public PTransform<PCollection<ReadableFile>, PCollection<Row>> buildTransform(
+      FileReadSchemaTransformConfiguration configuration) {
+
+    return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() {
+      @Override
+      public PCollection<Row> expand(PCollection<ReadableFile> input) {
+        Schema beamSchema = JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema());

Review Comment:
   In the context of this [comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095) may we consider `configuration.getSafeSchema()`?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern parameter must be set,"
+              + "but not both.");
+
+      // Input schema can be a schema String or a path to a file containing the schema
+      // Resolve to get a schema String
+      String schema = configuration.getSchema();
+      if (!Strings.isNullOrEmpty(schema)) {
+        schema = resolveSchemaStringOrFilePath(schema);
+        configuration = configuration.toBuilder().setSchema(schema).build();
+      }
+
+      PCollection<MatchResult.Metadata> files;
+      if (!Strings.isNullOrEmpty(configuration.getFilepattern())) {
+        Pipeline p = input.getPipeline();
+        FileIO.Match matchFiles = FileIO.match().filepattern(configuration.getFilepattern());
+        // Handle streaming case
+        matchFiles = (FileIO.Match) maybeApplyStreaming(matchFiles);
+
+        files = p.apply(matchFiles);
+      } else {
+        FileIO.MatchAll matchAllFiles = FileIO.matchAll();
+        // Handle streaming case
+        matchAllFiles = (FileIO.MatchAll) maybeApplyStreaming(matchAllFiles);
+
+        files =
+            input
+                .get(INPUT_TAG)
+                .apply(
+                    "Get filepatterns",
+                    MapElements.into(TypeDescriptors.strings())
+                        .via((row) -> row.getString("filepattern")))

Review Comment:
   1) In the context of this [comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095), this could change to `.via((row) -> Objects.requireNonNull(row.getString("filepattern"))))`
   
   2) Instead of hardcoding things like "filepattern", could we instead have these constants just below the class declaration as `static final String ...`?
   
   3) Should we check that the incoming row's Schema adhere's to having the `FieldType.STRING` field with the name "filepattern" and tests to show that meaningful errors display to the user about this expectation?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern parameter must be set,"
+              + "but not both.");
+
+      // Input schema can be a schema String or a path to a file containing the schema
+      // Resolve to get a schema String
+      String schema = configuration.getSchema();
+      if (!Strings.isNullOrEmpty(schema)) {
+        schema = resolveSchemaStringOrFilePath(schema);
+        configuration = configuration.toBuilder().setSchema(schema).build();
+      }
+
+      PCollection<MatchResult.Metadata> files;
+      if (!Strings.isNullOrEmpty(configuration.getFilepattern())) {
+        Pipeline p = input.getPipeline();
+        FileIO.Match matchFiles = FileIO.match().filepattern(configuration.getFilepattern());
+        // Handle streaming case
+        matchFiles = (FileIO.Match) maybeApplyStreaming(matchFiles);
+
+        files = p.apply(matchFiles);
+      } else {
+        FileIO.MatchAll matchAllFiles = FileIO.matchAll();
+        // Handle streaming case
+        matchAllFiles = (FileIO.MatchAll) maybeApplyStreaming(matchAllFiles);
+
+        files =
+            input
+                .get(INPUT_TAG)
+                .apply(
+                    "Get filepatterns",
+                    MapElements.into(TypeDescriptors.strings())
+                        .via((row) -> row.getString("filepattern")))
+                .apply("Match files", matchAllFiles);
+      }
+
+      // Pass readable files to the appropriate source and output rows.
+      PCollection<Row> output =
+          files
+              .apply(FileIO.readMatches())
+              .apply("Read files", getProvider().buildTransform(configuration));
+
+      return PCollectionRowTuple.of(OUTPUT_TAG, output);
+    }
+
+    public PTransform<?, PCollection<MatchResult.Metadata>> maybeApplyStreaming(
+        PTransform<?, PCollection<MatchResult.Metadata>> matchTransform) {
+      // Two parameters are provided to configure watching for new files.
+      Long terminateAfterSeconds = configuration.getTerminateAfterSecondsSinceNewOutput();
+      Long pollIntervalMillis = configuration.getPollIntervalMillis();
+
+      // Streaming is enabled when a poll interval is provided
+      if (pollIntervalMillis != null && pollIntervalMillis > 0L) {
+        Duration pollDuration = Duration.millis(pollIntervalMillis);
+
+        // By default, the file match transform never terminates
+        TerminationCondition<String, ?> terminationCondition = Growth.never();
+
+        // If provided, will terminate after this many seconds since seeing a new file
+        if (terminateAfterSeconds != null && terminateAfterSeconds > 0L) {
+          terminationCondition =
+              Growth.afterTimeSinceNewOutput(Duration.standardSeconds(terminateAfterSeconds));
+        }
+
+        // Apply watch for new files
+        if (matchTransform instanceof FileIO.Match) {
+          matchTransform =
+              ((FileIO.Match) matchTransform).continuously(pollDuration, terminationCondition);
+        } else if (matchTransform instanceof FileIO.MatchAll) {
+          matchTransform =
+              ((FileIO.MatchAll) matchTransform).continuously(pollDuration, terminationCondition);
+        }

Review Comment:
   Instead of these two conditionals, in addition to the conditionals above as well as the conditional from which this method is called, may we consider a `buildMatchTransform` method that performs these checks and returns a `PTransform<PCollection<String>, PCollection<MatchResult.Metadata>>`?  The conditionals could be flattened making it easier to troubleshoot and test.  And, on the subject of testing either this method or `buildMatchTransform` pattern, may we see more test coverage?  Finally, does the method need to be public and not package private?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern parameter must be set,"
+              + "but not both.");
+
+      // Input schema can be a schema String or a path to a file containing the schema
+      // Resolve to get a schema String
+      String schema = configuration.getSchema();
+      if (!Strings.isNullOrEmpty(schema)) {
+        schema = resolveSchemaStringOrFilePath(schema);
+        configuration = configuration.toBuilder().setSchema(schema).build();
+      }
+
+      PCollection<MatchResult.Metadata> files;
+      if (!Strings.isNullOrEmpty(configuration.getFilepattern())) {
+        Pipeline p = input.getPipeline();
+        FileIO.Match matchFiles = FileIO.match().filepattern(configuration.getFilepattern());
+        // Handle streaming case
+        matchFiles = (FileIO.Match) maybeApplyStreaming(matchFiles);
+
+        files = p.apply(matchFiles);
+      } else {
+        FileIO.MatchAll matchAllFiles = FileIO.matchAll();
+        // Handle streaming case
+        matchAllFiles = (FileIO.MatchAll) maybeApplyStreaming(matchAllFiles);
+
+        files =
+            input
+                .get(INPUT_TAG)
+                .apply(
+                    "Get filepatterns",
+                    MapElements.into(TypeDescriptors.strings())
+                        .via((row) -> row.getString("filepattern")))
+                .apply("Match files", matchAllFiles);
+      }
+
+      // Pass readable files to the appropriate source and output rows.
+      PCollection<Row> output =
+          files
+              .apply(FileIO.readMatches())
+              .apply("Read files", getProvider().buildTransform(configuration));
+
+      return PCollectionRowTuple.of(OUTPUT_TAG, output);
+    }
+
+    public PTransform<?, PCollection<MatchResult.Metadata>> maybeApplyStreaming(
+        PTransform<?, PCollection<MatchResult.Metadata>> matchTransform) {
+      // Two parameters are provided to configure watching for new files.
+      Long terminateAfterSeconds = configuration.getTerminateAfterSecondsSinceNewOutput();
+      Long pollIntervalMillis = configuration.getPollIntervalMillis();
+
+      // Streaming is enabled when a poll interval is provided
+      if (pollIntervalMillis != null && pollIntervalMillis > 0L) {
+        Duration pollDuration = Duration.millis(pollIntervalMillis);
+
+        // By default, the file match transform never terminates
+        TerminationCondition<String, ?> terminationCondition = Growth.never();
+
+        // If provided, will terminate after this many seconds since seeing a new file
+        if (terminateAfterSeconds != null && terminateAfterSeconds > 0L) {
+          terminationCondition =
+              Growth.afterTimeSinceNewOutput(Duration.standardSeconds(terminateAfterSeconds));
+        }
+
+        // Apply watch for new files
+        if (matchTransform instanceof FileIO.Match) {
+          matchTransform =
+              ((FileIO.Match) matchTransform).continuously(pollDuration, terminationCondition);
+        } else if (matchTransform instanceof FileIO.MatchAll) {
+          matchTransform =
+              ((FileIO.MatchAll) matchTransform).continuously(pollDuration, terminationCondition);
+        }
+      }
+      return matchTransform;
+    }
+
+    public String resolveSchemaStringOrFilePath(String schema) {
+      try {
+        MatchResult result;
+        try {
+          LOG.info("Attempting to locate input schema as a file path.");
+          result = FileSystems.match(schema);
+          // While some FileSystem implementations throw an IllegalArgumentException when matching
+          // invalid file paths, others will return a status ERROR or NOT_FOUND. The following check
+          // will throw an IllegalArgumentException and take us to the catch block.
+          checkArgument(result.status() == MatchResult.Status.OK);
+        } catch (IllegalArgumentException e) {
+          LOG.info(
+              "Input schema is not a valid file path. Will attempt to use it as a schema string.");
+          return schema;
+        }
+        checkArgument(
+            !result.metadata().isEmpty(),
+            "Failed to match any files for the input schema file path.");
+        List<ResourceId> resource =
+            result.metadata().stream()
+                .map(MatchResult.Metadata::resourceId)
+                .collect(Collectors.toList());
+
+        checkArgument(
+            resource.size() == 1,
+            "Expected exactly 1 schema file, but got " + resource.size() + " files.");
+
+        ReadableByteChannel byteChannel = FileSystems.open(resource.get(0));
+        Reader reader = Channels.newReader(byteChannel, UTF_8.name());
+        return CharStreams.toString(reader);
+      } catch (IOException e) {
+        throw new RuntimeException("Error when parsing input schema file: ", e);
+      }
+    }
+
+    private FileReadSchemaTransformFormatProvider getProvider() {
+      String format = configuration.getFormat();
+      Map<String, FileReadSchemaTransformFormatProvider> providers =
+          Providers.loadProviders(FileReadSchemaTransformFormatProvider.class);
+      checkArgument(
+          providers.containsKey(format),
+          String.format(
+              "Received unsupported file format: %s. Supported formats are %s",
+              format, providers.keySet()));
+
+      return providers.get(format);

Review Comment:
   After removing the SuppressWarnings annotation (See [comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095)), I was able to get check to pass with this:
   ```
   Optional<FileReadSchemaTransformFormatProvider> provider =
             Optional.ofNullable(providers.get(format));
   
   checkState(
       provider.isPresent(),
       String.format(
           "Received unsupported file format: %s. Supported formats are %s",
           format, providers.keySet()));
   
   return provider.get();
   ```



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProvider.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.io.FileIO.ReadableFile;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+/** A {@link FileReadSchemaTransformFormatProvider} that reads lines as Strings. */
+@AutoService(FileReadSchemaTransformFormatProvider.class)
+public class LineReadSchemaTransformFormatProvider
+    implements FileReadSchemaTransformFormatProvider {
+  @Override
+  public String identifier() {
+    return "line";
+  }
+
+  @Override
+  public PTransform<PCollection<ReadableFile>, PCollection<Row>> buildTransform(
+      FileReadSchemaTransformConfiguration configuration) {
+
+    return new PTransform<PCollection<ReadableFile>, PCollection<Row>>() {
+      @Override
+      public PCollection<Row> expand(PCollection<ReadableFile> input) {
+        Schema lineSchema = Schema.builder().addStringField("line").build();

Review Comment:
   Could we move this to a `static final Schema LINE_SCHEMA = ...` between lines 33 and 34?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern parameter must be set,"
+              + "but not both.");
+
+      // Input schema can be a schema String or a path to a file containing the schema
+      // Resolve to get a schema String
+      String schema = configuration.getSchema();

Review Comment:
   In this [comment](https://github.com/apache/beam/pull/25927#discussion_r1174141095)'s context, could we do this? I agree it looks like a game with the checker framework but the benefit of having the checker framework and removing the SuppressWarnings outweighs the cost of this I'd argue. Even if one checks nullorempty in a prior step and still calls getSchema, the checker framework complains.
   ```
   if (!Strings.isNullOrEmpty(configuration.getSchema())) {
           String schema = configuration.getSafeSchema();
   ...
   }
   ```



##########
sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.BYTE_SEQUENCE_TYPE_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.BYTE_TYPE_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.SINGLY_NESTED_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_SCHEMA;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformConfiguration.parquetConfigurationBuilder;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA;
+import static org.junit.Assume.assumeTrue;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+public abstract class FileReadSchemaTransformFormatProviderTest {
+
+  /** Returns the format of the {@linke FileReadSchemaTransformFormatProviderTest} subclass. */
+  protected abstract String getFormat();
+
+  /** Given a Beam Schema, returns the relevant source's String schema representation. */
+  protected abstract String getStringSchemaFromBeamSchema(Schema beamSchema);
+
+  /**
+   * Writes {@link Row}s to files then reads from those files. Performs a {@link
+   * org.apache.beam.sdk.testing.PAssert} check to validate the written and read {@link Row}s are
+   * equal.
+   */
+  protected abstract void runWriteAndReadTest(
+      Schema schema, List<Row> rows, String filePath, String schemaFilePath);
+
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  @Rule public TestName testName = new TestName();
+
+  protected Schema getFilepatternSchema() {
+    return Schema.of(Field.of("filepattern", FieldType.STRING));
+  }
+
+  protected String getFilePath() {
+    return getFolder() + "/test";
+  }
+
+  protected String getFolder() {
+    try {
+      return tmpFolder.newFolder(getFormat(), testName.getMethodName()).getAbsolutePath();
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Test
+  public void testAllPrimitiveDataTypes() {
+    Schema schema = ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+    List<Row> rows = DATA.allPrimitiveDataTypesRows;
+    String filePath = getFilePath();
+
+    runWriteAndReadTest(schema, rows, filePath, null);
+  }
+
+  @Test
+  public void testNullableAllPrimitiveDataTypes() {
+    Schema schema = NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+    List<Row> rows = DATA.nullableAllPrimitiveDataTypesRows;
+    String filePath = getFilePath();
+
+    runWriteAndReadTest(schema, rows, filePath, null);
+  }
+
+  @Test
+  public void testTimeContaining() {
+    // JSON schemas don't support DATETIME or other logical types
+    assumeTrue(!getFormat().equals("json"));
+
+    Schema schema = TIME_CONTAINING_SCHEMA;
+    List<Row> rows = DATA.timeContainingRows;
+    String filePath = getFilePath();
+
+    runWriteAndReadTest(schema, rows, filePath, null);
+  }
+
+  @Test
+  public void testByteType() {
+    List<String> formatsThatSupportSingleByteType = Arrays.asList("csv", "json", "xml");
+    assumeTrue(formatsThatSupportSingleByteType.contains(getFormat()));
+
+    Schema schema = BYTE_TYPE_SCHEMA;
+    List<Row> rows = DATA.byteTypeRows;
+    String filePath = getFilePath();
+
+    runWriteAndReadTest(schema, rows, filePath, null);
+  }
+
+  @Test
+  public void testByteSequenceType() {
+    List<String> formatsThatSupportByteSequenceType = Arrays.asList("avro", "parquet");
+    assumeTrue(formatsThatSupportByteSequenceType.contains(getFormat()));
+
+    Schema schema = BYTE_SEQUENCE_TYPE_SCHEMA;
+    List<Row> rows = DATA.byteSequenceTypeRows;
+    String filePath = getFilePath();
+
+    runWriteAndReadTest(schema, rows, filePath, null);
+  }
+
+  @Test
+  public void testArrayPrimitiveDataTypes() {
+    Schema schema = ARRAY_PRIMITIVE_DATA_TYPES_SCHEMA;
+    List<Row> rows = DATA.arrayPrimitiveDataTypesRows;
+    String filePath = getFilePath();
+
+    runWriteAndReadTest(schema, rows, filePath, null);
+  }
+
+  @Test
+  public void testNestedRepeatedDataTypes() {

Review Comment:
   Could we see a test for the doublyNestedDataTypesRepeatRows?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1174128749


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class FileReadSchemaTransformConfiguration {
+  static final Set<String> VALID_FORMATS = Sets.newHashSet("avro", "parquet", "json", "line");
+
+  public void validate() {
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getFormat()) && VALID_FORMATS.contains(this.getFormat()),
+        "A valid file format must be specified. Please specify one of: "
+            + VALID_FORMATS.toString());
+
+    if (!this.getFormat().equals("line")) {
+      checkArgument(
+          !Strings.isNullOrEmpty(this.getSchema()),
+          String.format(
+              "A schema must be specified when reading files with %s format. You may provide a schema string or a path to a file containing the schema.",
+              this.getFormat()));
+    }

Review Comment:
   As we talked about, let's keep the String. In the future I would like to see if an actual Schema type would work and still be easy from a user perspective.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1180770467


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern parameter must be set,"
+              + "but not both.");
+
+      // Input schema can be a schema String or a path to a file containing the schema
+      // Resolve to get a schema String
+      String schema = configuration.getSchema();
+      if (!Strings.isNullOrEmpty(schema)) {
+        schema = resolveSchemaStringOrFilePath(schema);
+        configuration = configuration.toBuilder().setSchema(schema).build();
+      }
+
+      PCollection<MatchResult.Metadata> files;
+      if (!Strings.isNullOrEmpty(configuration.getFilepattern())) {
+        Pipeline p = input.getPipeline();
+        FileIO.Match matchFiles = FileIO.match().filepattern(configuration.getFilepattern());
+        // Handle streaming case
+        matchFiles = (FileIO.Match) maybeApplyStreaming(matchFiles);
+
+        files = p.apply(matchFiles);
+      } else {
+        FileIO.MatchAll matchAllFiles = FileIO.matchAll();
+        // Handle streaming case
+        matchAllFiles = (FileIO.MatchAll) maybeApplyStreaming(matchAllFiles);
+
+        files =
+            input
+                .get(INPUT_TAG)
+                .apply(
+                    "Get filepatterns",
+                    MapElements.into(TypeDescriptors.strings())
+                        .via((row) -> row.getString("filepattern")))
+                .apply("Match files", matchAllFiles);
+      }
+
+      // Pass readable files to the appropriate source and output rows.
+      PCollection<Row> output =
+          files
+              .apply(FileIO.readMatches())
+              .apply("Read files", getProvider().buildTransform(configuration));
+
+      return PCollectionRowTuple.of(OUTPUT_TAG, output);
+    }
+
+    public PTransform<?, PCollection<MatchResult.Metadata>> maybeApplyStreaming(
+        PTransform<?, PCollection<MatchResult.Metadata>> matchTransform) {
+      // Two parameters are provided to configure watching for new files.
+      Long terminateAfterSeconds = configuration.getTerminateAfterSecondsSinceNewOutput();
+      Long pollIntervalMillis = configuration.getPollIntervalMillis();
+
+      // Streaming is enabled when a poll interval is provided
+      if (pollIntervalMillis != null && pollIntervalMillis > 0L) {
+        Duration pollDuration = Duration.millis(pollIntervalMillis);
+
+        // By default, the file match transform never terminates
+        TerminationCondition<String, ?> terminationCondition = Growth.never();
+
+        // If provided, will terminate after this many seconds since seeing a new file
+        if (terminateAfterSeconds != null && terminateAfterSeconds > 0L) {
+          terminationCondition =
+              Growth.afterTimeSinceNewOutput(Duration.standardSeconds(terminateAfterSeconds));
+        }
+
+        // Apply watch for new files
+        if (matchTransform instanceof FileIO.Match) {
+          matchTransform =
+              ((FileIO.Match) matchTransform).continuously(pollDuration, terminationCondition);
+        } else if (matchTransform instanceof FileIO.MatchAll) {
+          matchTransform =
+              ((FileIO.MatchAll) matchTransform).continuously(pollDuration, terminationCondition);
+        }

Review Comment:
   Refactored into a buildMatchTransform as requested. I'm having trouble figuring out how to test this though as Match/MatchAll don't implement an equals method that would allow me to compare the generated match transform with an expected transform. Their internal MatchConfiguration objects are private attributes too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1158864089


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java:
##########
@@ -128,50 +133,154 @@ public String apply(Row input) {
     };
   }
 
+  public static String jsonSchemaStringFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaFromBeamSchema(beamSchema).toString();
+  }
+
+  public static ObjectSchema jsonSchemaFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaBuilderFromBeamSchema(beamSchema).build();
+  }
+
+  private static ObjectSchema.Builder jsonSchemaBuilderFromBeamSchema(Schema beamSchema) {
+    // Beam Schema is strict, so we should not accept additional properties
+    ObjectSchema.Builder jsonSchemaBuilder = ObjectSchema.builder().additionalProperties(false);
+
+    for (Field field : beamSchema.getFields()) {
+      String name = field.getName();
+      org.everit.json.schema.Schema propertySchema = jsonPropertyFromBeamType(field.getType());
+
+      // Add property and make it required
+      jsonSchemaBuilder = jsonSchemaBuilder.addPropertySchema(name, propertySchema);
+      jsonSchemaBuilder.addRequiredProperty(name);
+    }
+
+    return jsonSchemaBuilder;
+  }
+
+  @SuppressWarnings({

Review Comment:
   Nit: I'd prefer not to include new @suppressWarnings blocks if possible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1164409559


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java:
##########
@@ -128,50 +133,154 @@ public String apply(Row input) {
     };
   }
 
+  public static String jsonSchemaStringFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaFromBeamSchema(beamSchema).toString();
+  }
+
+  public static ObjectSchema jsonSchemaFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaBuilderFromBeamSchema(beamSchema).build();
+  }
+
+  private static ObjectSchema.Builder jsonSchemaBuilderFromBeamSchema(Schema beamSchema) {
+    // Beam Schema is strict, so we should not accept additional properties
+    ObjectSchema.Builder jsonSchemaBuilder = ObjectSchema.builder().additionalProperties(false);
+
+    for (Field field : beamSchema.getFields()) {
+      String name = field.getName();
+      org.everit.json.schema.Schema propertySchema = jsonPropertyFromBeamType(field.getType());
+
+      // Add property and make it required
+      jsonSchemaBuilder = jsonSchemaBuilder.addPropertySchema(name, propertySchema);
+      jsonSchemaBuilder.addRequiredProperty(name);
+    }
+
+    return jsonSchemaBuilder;
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+  })
+  private static org.everit.json.schema.Schema jsonPropertyFromBeamType(FieldType beamType) {
+    org.everit.json.schema.Schema.Builder<? extends org.everit.json.schema.Schema> propertySchema;
+
+    switch (beamType.getTypeName()) {
+      case BYTE:
+      case INT16:
+      case INT32:
+      case INT64:
+        propertySchema = NumberSchema.builder().requiresInteger(true);
+        break;
+      case DECIMAL:
+      case FLOAT:
+      case DOUBLE:
+        propertySchema = NumberSchema.builder();
+        break;
+      case STRING:
+        propertySchema = StringSchema.builder();
+        break;
+      case BOOLEAN:
+        propertySchema = BooleanSchema.builder();
+        break;
+      case ARRAY:
+      case ITERABLE:
+        propertySchema =
+            ArraySchema.builder()
+                .allItemSchema(jsonPropertyFromBeamType(beamType.getCollectionElementType()));
+        break;
+      case ROW:
+        propertySchema = jsonSchemaBuilderFromBeamSchema(beamType.getRowSchema());
+        break;
+
+        // add more Beam to JSON types
+      default:
+        throw new IllegalArgumentException("Unsupported Beam to JSON type: " + beamType);
+    }
+
+    if (beamType.getNullable()) {
+      propertySchema = propertySchema.nullable(true);
+    }
+
+    return propertySchema.build();
+  }
+
   public static Schema beamSchemaFromJsonSchema(String jsonSchemaStr) {
     org.everit.json.schema.ObjectSchema jsonSchema = jsonSchemaFromString(jsonSchemaStr);
     return beamSchemaFromJsonSchema(jsonSchema);
   }
 
   private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSchema jsonSchema) {
     Schema.Builder beamSchemaBuilder = Schema.builder();
-    for (Map.Entry<String, org.everit.json.schema.Schema> entry :
-        jsonSchema.getPropertySchemas().entrySet()) {
+    Map<String, org.everit.json.schema.Schema> properties =
+        new HashMap<>(jsonSchema.getPropertySchemas());
+    // Properties in a JSON Schema are stored in a Map object and unfortunately don't maintain
+    // order. However, the schema's required properties is a list of property names that is
+    // consistent and is in the same order as when the schema was first created. To create a
+    // consistent Beam Schema from the same JSON schema, we add Schema Fields following this order.
+    // We can guarantee a consistent Beam schema when all JSON properties are required.
+    for (String propertyName : jsonSchema.getRequiredProperties()) {
+      org.everit.json.schema.Schema propertySchema = properties.get(propertyName);
+      if (propertySchema == null) {
+        throw new IllegalArgumentException("Unable to parse schema " + jsonSchema);
+      }
+
+      Boolean isNullable =
+          Boolean.TRUE.equals(propertySchema.getUnprocessedProperties().get("nullable"));
+      beamSchemaBuilder =
+          addPropertySchemaToBeamSchema(
+              propertyName, propertySchema, beamSchemaBuilder, isNullable);
+      // Remove properties we already added.
+      properties.remove(propertyName, propertySchema);
+    }
+
+    // Now we are potentially left with properties that are not required. Add them too.
+    // Note: having more than one non-required properties may result in  inconsistent
+    // Beam schema field orderings.

Review Comment:
   Sorry I worded it wrong. This **does** guarantee that a given Beam schema survives a roundtrip of converting to JSON schema and back. The explanation is because when we convert from Beam schema to Json schema, we specify all fields as required. So when converting from Json schema to Beam schema, the order is preserved in the Json `required` array. The takeaway is that a Json schema generated from a Beam schema will have all of its properties as required.
   
   However, it is not guaranteed that a given Json schema **with non-required properties** will always output the same Beam Schema. It will be the same fields, but order is not predictable because non-required properties are accessed by a map.
   In the latter case, the failure will show up when working with Beam Schemas and Rows in a use-case where Schema equality matter. The Schemas will be [equivalent](https://github.com/apache/beam/blob/3609766cd56c0b78e62757dc3fe848fd8ce01c1b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L432-L435), but may not always be equal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1147523858


##########
sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java:
##########
@@ -270,6 +400,55 @@ public static SerializableFunction<Row, DoublyNestedDataTypes> doublyNestedDataT
     return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
   }
 
+  /**
+   * Contains all primitive Java types supported by Avro. The purpose of this class is to test
+   * schema-aware PTransforms with flat {@link Schema} {@link Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class AvroPrimitiveDataTypes implements Serializable {

Review Comment:
   Yeah the problem is Short (int16)  and Byte (a single Byte) are not types that are supported by Avro (see [Avro primitive types](https://docs.oracle.com/cd/E26161_02/html/GettingStartedGuide/avroschemas.html#:~:text=in%20this%20chapter.-,Primitive%20Data%20Types,-In%20the%20previous)) or Parquet (see [primitive types](https://parquet.apache.org/docs/file-format/types/)). So actually in AvroUtils we convert Beam Short and Byte types to Avro Integer (int32).
   
   Avro and Parquet do support a sequence of bytes though, so I've replaced Byte with ByteBuffer in `AvroPrimitiveDataTypes`.
   
   >Another alternative would be just to get rid of the Byte field in AllPrimitiveDataTypes
   
   If you agree, I'd be happy to remove Short and replace Byte with ByteBuffer in `AllPrimitiveDataTypes` and use that for everything instead (for both read and write). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1158864691


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java:
##########
@@ -128,50 +133,154 @@ public String apply(Row input) {
     };
   }
 
+  public static String jsonSchemaStringFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaFromBeamSchema(beamSchema).toString();
+  }
+
+  public static ObjectSchema jsonSchemaFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaBuilderFromBeamSchema(beamSchema).build();
+  }
+
+  private static ObjectSchema.Builder jsonSchemaBuilderFromBeamSchema(Schema beamSchema) {
+    // Beam Schema is strict, so we should not accept additional properties
+    ObjectSchema.Builder jsonSchemaBuilder = ObjectSchema.builder().additionalProperties(false);
+
+    for (Field field : beamSchema.getFields()) {
+      String name = field.getName();
+      org.everit.json.schema.Schema propertySchema = jsonPropertyFromBeamType(field.getType());
+
+      // Add property and make it required
+      jsonSchemaBuilder = jsonSchemaBuilder.addPropertySchema(name, propertySchema);
+      jsonSchemaBuilder.addRequiredProperty(name);
+    }
+
+    return jsonSchemaBuilder;
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+  })
+  private static org.everit.json.schema.Schema jsonPropertyFromBeamType(FieldType beamType) {
+    org.everit.json.schema.Schema.Builder<? extends org.everit.json.schema.Schema> propertySchema;
+
+    switch (beamType.getTypeName()) {
+      case BYTE:
+      case INT16:
+      case INT32:
+      case INT64:
+        propertySchema = NumberSchema.builder().requiresInteger(true);
+        break;
+      case DECIMAL:
+      case FLOAT:
+      case DOUBLE:
+        propertySchema = NumberSchema.builder();
+        break;
+      case STRING:
+        propertySchema = StringSchema.builder();
+        break;
+      case BOOLEAN:
+        propertySchema = BooleanSchema.builder();
+        break;
+      case ARRAY:
+      case ITERABLE:
+        propertySchema =
+            ArraySchema.builder()
+                .allItemSchema(jsonPropertyFromBeamType(beamType.getCollectionElementType()));
+        break;
+      case ROW:
+        propertySchema = jsonSchemaBuilderFromBeamSchema(beamType.getRowSchema());
+        break;
+
+        // add more Beam to JSON types
+      default:
+        throw new IllegalArgumentException("Unsupported Beam to JSON type: " + beamType);
+    }
+
+    if (beamType.getNullable()) {
+      propertySchema = propertySchema.nullable(true);
+    }
+
+    return propertySchema.build();
+  }
+
   public static Schema beamSchemaFromJsonSchema(String jsonSchemaStr) {
     org.everit.json.schema.ObjectSchema jsonSchema = jsonSchemaFromString(jsonSchemaStr);
     return beamSchemaFromJsonSchema(jsonSchema);
   }
 
   private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSchema jsonSchema) {
     Schema.Builder beamSchemaBuilder = Schema.builder();
-    for (Map.Entry<String, org.everit.json.schema.Schema> entry :
-        jsonSchema.getPropertySchemas().entrySet()) {
+    Map<String, org.everit.json.schema.Schema> properties =
+        new HashMap<>(jsonSchema.getPropertySchemas());
+    // Properties in a JSON Schema are stored in a Map object and unfortunately don't maintain
+    // order. However, the schema's required properties is a list of property names that is
+    // consistent and is in the same order as when the schema was first created. To create a
+    // consistent Beam Schema from the same JSON schema, we add Schema Fields following this order.
+    // We can guarantee a consistent Beam schema when all JSON properties are required.
+    for (String propertyName : jsonSchema.getRequiredProperties()) {
+      org.everit.json.schema.Schema propertySchema = properties.get(propertyName);
+      if (propertySchema == null) {
+        throw new IllegalArgumentException("Unable to parse schema " + jsonSchema);
+      }
+
+      Boolean isNullable =
+          Boolean.TRUE.equals(propertySchema.getUnprocessedProperties().get("nullable"));
+      beamSchemaBuilder =
+          addPropertySchemaToBeamSchema(
+              propertyName, propertySchema, beamSchemaBuilder, isNullable);
+      // Remove properties we already added.
+      properties.remove(propertyName, propertySchema);
+    }
+
+    // Now we are potentially left with properties that are not required. Add them too.
+    // Note: having more than one non-required properties may result in  inconsistent
+    // Beam schema field orderings.

Review Comment:
   what are the consequences of this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on PR #25927:
URL: https://github.com/apache/beam/pull/25927#issuecomment-1518079443

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1170346673


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class FileReadSchemaTransformConfiguration {
+  static final Set<String> VALID_FORMATS = Sets.newHashSet("avro", "parquet", "json", "line");
+
+  public void validate() {
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getFormat()) && VALID_FORMATS.contains(this.getFormat()),
+        "A valid file format must be specified. Please specify one of: "
+            + VALID_FORMATS.toString());
+
+    if (!this.getFormat().equals("line")) {
+      checkArgument(
+          !Strings.isNullOrEmpty(this.getSchema()),
+          String.format(
+              "A schema must be specified when reading files with %s format. You may provide a schema string or a path to a file containing the schema.",
+              this.getFormat()));
+    }
+
+    Integer terminateAfterSecondsSinceNewOutput = this.getTerminateAfterSecondsSinceNewOutput();
+    Integer pollIntervalMillis = this.getPollIntervalMillis();
+    if (terminateAfterSecondsSinceNewOutput != null && terminateAfterSecondsSinceNewOutput > 0) {
+      checkArgument(
+          pollIntervalMillis != null && pollIntervalMillis > 0,
+          "Found positive value for terminateAfterSecondsSinceNewOutput but non-positive"
+              + "value for pollIntervalMillis. Please set pollIntervalMillis as well to enable"
+              + "streaming.");
+    }
+  }
+
+  public static Builder builder() {
+    return new AutoValue_FileReadSchemaTransformConfiguration.Builder();
+  }
+
+  /**
+   * The format of the file(s) to read.
+   *
+   * <p>Possible values are: `"lines"`, `"avro"`, `"parquet"`, `"json"`
+   */
+  public abstract String getFormat();
+
+  /**
+   * The filepattern used to match and read files.
+   *
+   * <p>May instead use an input PCollection<Row> of filepatterns.
+   */
+  @Nullable
+  public abstract String getFilepattern();
+
+  /**
+   * The schema used by sources to deserialize data and create Beam Rows.
+   *
+   * <p>May be provided as a schema String or as a String path to a file that contains the schema.
+   */
+  @Nullable
+  public abstract String getSchema();
+
+  /**
+   * The time, in milliseconds, to wait before polling for new files.
+   *
+   * <p>This will set the pipeline to be a streaming pipeline and will continuously watch for new
+   * files.
+   *
+   * <p>Note: This only polls for new files. New updates to an existing file will not be watched
+   * for.
+   */
+  @Nullable
+  public abstract Integer getPollIntervalMillis();

Review Comment:
   Was trying to keep cross-language use case in mind, but looks like Python ints are translated to INT64 types: https://github.com/apache/beam/blob/f2f2fb83a21c231d741008d5fc6fdda7c69bb786/sdks/python/apache_beam/typehints/schemas.py#L28
   Thanks for the catch, will change it to Long



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1166150590


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class FileReadSchemaTransformConfiguration {
+  static final Set<String> VALID_FORMATS = Sets.newHashSet("avro", "parquet", "json", "line");
+
+  public void validate() {
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getFormat()) && VALID_FORMATS.contains(this.getFormat()),
+        "A valid file format must be specified. Please specify one of: "
+            + VALID_FORMATS.toString());
+
+    if (!this.getFormat().equals("line")) {
+      checkArgument(
+          !Strings.isNullOrEmpty(this.getSchema()),
+          String.format(
+              "A schema must be specified when reading files with %s format. You may provide a schema string or a path to a file containing the schema.",
+              this.getFormat()));
+    }
+
+    Integer terminateAfterSecondsSinceNewOutput = this.getTerminateAfterSecondsSinceNewOutput();
+    Integer pollIntervalMillis = this.getPollIntervalMillis();
+    if (terminateAfterSecondsSinceNewOutput != null && terminateAfterSecondsSinceNewOutput > 0) {
+      checkArgument(
+          pollIntervalMillis != null && pollIntervalMillis > 0,
+          "Found positive value for terminateAfterSecondsSinceNewOutput but non-positive"
+              + "value for pollIntervalMillis. Please set pollIntervalMillis as well to enable"
+              + "streaming.");
+    }
+  }
+
+  public static Builder builder() {
+    return new AutoValue_FileReadSchemaTransformConfiguration.Builder();
+  }
+
+  /**
+   * The format of the file(s) to read.
+   *
+   * <p>Possible values are: `"lines"`, `"avro"`, `"parquet"`, `"json"`
+   */
+  public abstract String getFormat();
+
+  /**
+   * The filepattern used to match and read files.
+   *
+   * <p>May instead use an input PCollection<Row> of filepatterns.
+   */
+  @Nullable
+  public abstract String getFilepattern();
+
+  /**
+   * The schema used by sources to deserialize data and create Beam Rows.
+   *
+   * <p>May be provided as a schema String or as a String path to a file that contains the schema.
+   */
+  @Nullable
+  public abstract String getSchema();
+
+  /**
+   * The time, in milliseconds, to wait before polling for new files.
+   *
+   * <p>This will set the pipeline to be a streaming pipeline and will continuously watch for new
+   * files.
+   *
+   * <p>Note: This only polls for new files. New updates to an existing file will not be watched
+   * for.
+   */
+  @Nullable
+  public abstract Integer getPollIntervalMillis();

Review Comment:
   Should this be a Long to correspond to the type required by [Duration](https://www.joda.org/joda-time/apidocs/org/joda/time/Duration.html#Duration-long-)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1179505425


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})

Review Comment:
   Sorry this suggestion slipped my mind last review. Will implement this, thanks for the examples



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1146688175


##########
sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java:
##########
@@ -38,6 +39,28 @@ public class SchemaAwareJavaBeans {
 
   private static final DefaultSchemaProvider DEFAULT_SCHEMA_PROVIDER = new DefaultSchemaProvider();
 
+  /** Convenience method for {@link AllPrimitiveDataTypes} instantiation. */

Review Comment:
   ```
   /** Convenience method for {@link AvroPrimitiveDataTypes} instantiation. */
   ```



##########
sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java:
##########
@@ -270,6 +400,55 @@ public static SerializableFunction<Row, DoublyNestedDataTypes> doublyNestedDataT
     return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
   }
 
+  /**
+   * Contains all primitive Java types supported by Avro. The purpose of this class is to test
+   * schema-aware PTransforms with flat {@link Schema} {@link Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class AvroPrimitiveDataTypes implements Serializable {

Review Comment:
   I'll let you decide based on my comment and I'll LGTM either way.
   
   What I like about AvroPrimitiveDataTypes is that it makes it easy to write tests against field types we know would be compatible with Avro encodings.  What I struggle with is:
   1. The name implies a coupling of data encoding though I get it
   2. Writing tests against AllPrimitiveDataTypes that fail is valuable information on how we expose FieldType
   3. Dealing with annoying FieldTypes that aren't compatible with a Schema related use case is valuable information to figure out how a real user of Beam would have to deal with this when they don't have the ability to change the original class but do have the ability to change the Schema using, for example, https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/DropFields.html.  I dealt with this problem when writing tests for CsvIO and it was valuable discovery.  Writing tests around this problem of incompatible fields provided for me a usability test not just does this code work test.
   
   Another alternative would be just to get rid of the Byte field in AllPrimitiveDataTypes but that would require you to refactor the tests that break as a result.
   
   This comment also applies to the other additions to SchemaAwareJavaBeans and I didn't comment on those individually as it would have been redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1149347863


##########
sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java:
##########
@@ -270,6 +400,55 @@ public static SerializableFunction<Row, DoublyNestedDataTypes> doublyNestedDataT
     return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
   }
 
+  /**
+   * Contains all primitive Java types supported by Avro. The purpose of this class is to test
+   * schema-aware PTransforms with flat {@link Schema} {@link Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class AvroPrimitiveDataTypes implements Serializable {

Review Comment:
   If you're not opposed, will proceed by not including any form of bytes in `AllPrimitiveDataTypes` since there is no common overlap. Instead, we can have two new classes, ByteType and ByteSequenceType that will test for byte, byte[]. I can create new tests for these in both file read and write transforms.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1158922274


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java:
##########
@@ -128,50 +133,154 @@ public String apply(Row input) {
     };
   }
 
+  public static String jsonSchemaStringFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaFromBeamSchema(beamSchema).toString();
+  }
+
+  public static ObjectSchema jsonSchemaFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaBuilderFromBeamSchema(beamSchema).build();
+  }
+
+  private static ObjectSchema.Builder jsonSchemaBuilderFromBeamSchema(Schema beamSchema) {
+    // Beam Schema is strict, so we should not accept additional properties
+    ObjectSchema.Builder jsonSchemaBuilder = ObjectSchema.builder().additionalProperties(false);
+
+    for (Field field : beamSchema.getFields()) {
+      String name = field.getName();
+      org.everit.json.schema.Schema propertySchema = jsonPropertyFromBeamType(field.getType());
+
+      // Add property and make it required
+      jsonSchemaBuilder = jsonSchemaBuilder.addPropertySchema(name, propertySchema);
+      jsonSchemaBuilder.addRequiredProperty(name);
+    }
+
+    return jsonSchemaBuilder;
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+  })
+  private static org.everit.json.schema.Schema jsonPropertyFromBeamType(FieldType beamType) {
+    org.everit.json.schema.Schema.Builder<? extends org.everit.json.schema.Schema> propertySchema;
+
+    switch (beamType.getTypeName()) {
+      case BYTE:
+      case INT16:
+      case INT32:
+      case INT64:
+        propertySchema = NumberSchema.builder().requiresInteger(true);
+        break;
+      case DECIMAL:
+      case FLOAT:
+      case DOUBLE:
+        propertySchema = NumberSchema.builder();
+        break;
+      case STRING:
+        propertySchema = StringSchema.builder();
+        break;
+      case BOOLEAN:
+        propertySchema = BooleanSchema.builder();
+        break;
+      case ARRAY:
+      case ITERABLE:
+        propertySchema =
+            ArraySchema.builder()
+                .allItemSchema(jsonPropertyFromBeamType(beamType.getCollectionElementType()));
+        break;
+      case ROW:
+        propertySchema = jsonSchemaBuilderFromBeamSchema(beamType.getRowSchema());
+        break;
+
+        // add more Beam to JSON types
+      default:
+        throw new IllegalArgumentException("Unsupported Beam to JSON type: " + beamType);
+    }
+
+    if (beamType.getNullable()) {
+      propertySchema = propertySchema.nullable(true);
+    }
+
+    return propertySchema.build();
+  }
+
   public static Schema beamSchemaFromJsonSchema(String jsonSchemaStr) {
     org.everit.json.schema.ObjectSchema jsonSchema = jsonSchemaFromString(jsonSchemaStr);
     return beamSchemaFromJsonSchema(jsonSchema);
   }
 
   private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSchema jsonSchema) {
     Schema.Builder beamSchemaBuilder = Schema.builder();
-    for (Map.Entry<String, org.everit.json.schema.Schema> entry :
-        jsonSchema.getPropertySchemas().entrySet()) {
+    Map<String, org.everit.json.schema.Schema> properties =
+        new HashMap<>(jsonSchema.getPropertySchemas());
+    // Properties in a JSON Schema are stored in a Map object and unfortunately don't maintain
+    // order. However, the schema's required properties is a list of property names that is
+    // consistent and is in the same order as when the schema was first created. To create a
+    // consistent Beam Schema from the same JSON schema, we add Schema Fields following this order.
+    // We can guarantee a consistent Beam schema when all JSON properties are required.
+    for (String propertyName : jsonSchema.getRequiredProperties()) {
+      org.everit.json.schema.Schema propertySchema = properties.get(propertyName);
+      if (propertySchema == null) {
+        throw new IllegalArgumentException("Unable to parse schema " + jsonSchema);
+      }
+
+      Boolean isNullable =
+          Boolean.TRUE.equals(propertySchema.getUnprocessedProperties().get("nullable"));
+      beamSchemaBuilder =
+          addPropertySchemaToBeamSchema(
+              propertyName, propertySchema, beamSchemaBuilder, isNullable);
+      // Remove properties we already added.
+      properties.remove(propertyName, propertySchema);
+    }
+
+    // Now we are potentially left with properties that are not required. Add them too.
+    // Note: having more than one non-required properties may result in  inconsistent
+    // Beam schema field orderings.

Review Comment:
   properties in a json schema do not maintain ordering. When we convert a Beam schema to a json schema, we add the fields to the required properties in order. So when converting from json schema back to Beam schema, the ordering is kept in required properties. As for the additional (non-required) properties, we won't be able to guarantee the Beam schema order. 
   
   Unlike json schemas, Beam schemas care about order. So one json schema with non-required properties can output more than one beam schema, and for Beam standards they would be equivalent but not equal. This makes a best effort attempt to make a given Beam schema survive a roundtrip of converting to json schema and back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1164279392


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java:
##########
@@ -128,50 +133,154 @@ public String apply(Row input) {
     };
   }
 
+  public static String jsonSchemaStringFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaFromBeamSchema(beamSchema).toString();
+  }
+
+  public static ObjectSchema jsonSchemaFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaBuilderFromBeamSchema(beamSchema).build();
+  }
+
+  private static ObjectSchema.Builder jsonSchemaBuilderFromBeamSchema(Schema beamSchema) {
+    // Beam Schema is strict, so we should not accept additional properties
+    ObjectSchema.Builder jsonSchemaBuilder = ObjectSchema.builder().additionalProperties(false);
+
+    for (Field field : beamSchema.getFields()) {
+      String name = field.getName();
+      org.everit.json.schema.Schema propertySchema = jsonPropertyFromBeamType(field.getType());
+
+      // Add property and make it required
+      jsonSchemaBuilder = jsonSchemaBuilder.addPropertySchema(name, propertySchema);
+      jsonSchemaBuilder.addRequiredProperty(name);
+    }
+
+    return jsonSchemaBuilder;
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+  })
+  private static org.everit.json.schema.Schema jsonPropertyFromBeamType(FieldType beamType) {
+    org.everit.json.schema.Schema.Builder<? extends org.everit.json.schema.Schema> propertySchema;
+
+    switch (beamType.getTypeName()) {
+      case BYTE:
+      case INT16:
+      case INT32:
+      case INT64:
+        propertySchema = NumberSchema.builder().requiresInteger(true);
+        break;
+      case DECIMAL:
+      case FLOAT:
+      case DOUBLE:
+        propertySchema = NumberSchema.builder();
+        break;
+      case STRING:
+        propertySchema = StringSchema.builder();
+        break;
+      case BOOLEAN:
+        propertySchema = BooleanSchema.builder();
+        break;
+      case ARRAY:
+      case ITERABLE:
+        propertySchema =
+            ArraySchema.builder()
+                .allItemSchema(jsonPropertyFromBeamType(beamType.getCollectionElementType()));
+        break;
+      case ROW:
+        propertySchema = jsonSchemaBuilderFromBeamSchema(beamType.getRowSchema());
+        break;
+
+        // add more Beam to JSON types
+      default:
+        throw new IllegalArgumentException("Unsupported Beam to JSON type: " + beamType);
+    }
+
+    if (beamType.getNullable()) {
+      propertySchema = propertySchema.nullable(true);
+    }
+
+    return propertySchema.build();
+  }
+
   public static Schema beamSchemaFromJsonSchema(String jsonSchemaStr) {
     org.everit.json.schema.ObjectSchema jsonSchema = jsonSchemaFromString(jsonSchemaStr);
     return beamSchemaFromJsonSchema(jsonSchema);
   }
 
   private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSchema jsonSchema) {
     Schema.Builder beamSchemaBuilder = Schema.builder();
-    for (Map.Entry<String, org.everit.json.schema.Schema> entry :
-        jsonSchema.getPropertySchemas().entrySet()) {
+    Map<String, org.everit.json.schema.Schema> properties =
+        new HashMap<>(jsonSchema.getPropertySchemas());
+    // Properties in a JSON Schema are stored in a Map object and unfortunately don't maintain
+    // order. However, the schema's required properties is a list of property names that is
+    // consistent and is in the same order as when the schema was first created. To create a
+    // consistent Beam Schema from the same JSON schema, we add Schema Fields following this order.
+    // We can guarantee a consistent Beam schema when all JSON properties are required.
+    for (String propertyName : jsonSchema.getRequiredProperties()) {
+      org.everit.json.schema.Schema propertySchema = properties.get(propertyName);
+      if (propertySchema == null) {
+        throw new IllegalArgumentException("Unable to parse schema " + jsonSchema);
+      }
+
+      Boolean isNullable =
+          Boolean.TRUE.equals(propertySchema.getUnprocessedProperties().get("nullable"));
+      beamSchemaBuilder =
+          addPropertySchemaToBeamSchema(
+              propertyName, propertySchema, beamSchemaBuilder, isNullable);
+      // Remove properties we already added.
+      properties.remove(propertyName, propertySchema);
+    }
+
+    // Now we are potentially left with properties that are not required. Add them too.
+    // Note: having more than one non-required properties may result in  inconsistent
+    // Beam schema field orderings.

Review Comment:
   What is the consequences of failing the best efforts? can we detect the failure, and do something about it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25927:
URL: https://github.com/apache/beam/pull/25927#issuecomment-1505605679

   Run Spotless PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "johnjcasey (via GitHub)" <gi...@apache.org>.
johnjcasey commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1165593736


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java:
##########
@@ -128,50 +133,154 @@ public String apply(Row input) {
     };
   }
 
+  public static String jsonSchemaStringFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaFromBeamSchema(beamSchema).toString();
+  }
+
+  public static ObjectSchema jsonSchemaFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaBuilderFromBeamSchema(beamSchema).build();
+  }
+
+  private static ObjectSchema.Builder jsonSchemaBuilderFromBeamSchema(Schema beamSchema) {
+    // Beam Schema is strict, so we should not accept additional properties
+    ObjectSchema.Builder jsonSchemaBuilder = ObjectSchema.builder().additionalProperties(false);
+
+    for (Field field : beamSchema.getFields()) {
+      String name = field.getName();
+      org.everit.json.schema.Schema propertySchema = jsonPropertyFromBeamType(field.getType());
+
+      // Add property and make it required
+      jsonSchemaBuilder = jsonSchemaBuilder.addPropertySchema(name, propertySchema);
+      jsonSchemaBuilder.addRequiredProperty(name);
+    }
+
+    return jsonSchemaBuilder;
+  }
+
+  @SuppressWarnings({
+    "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+  })
+  private static org.everit.json.schema.Schema jsonPropertyFromBeamType(FieldType beamType) {
+    org.everit.json.schema.Schema.Builder<? extends org.everit.json.schema.Schema> propertySchema;
+
+    switch (beamType.getTypeName()) {
+      case BYTE:
+      case INT16:
+      case INT32:
+      case INT64:
+        propertySchema = NumberSchema.builder().requiresInteger(true);
+        break;
+      case DECIMAL:
+      case FLOAT:
+      case DOUBLE:
+        propertySchema = NumberSchema.builder();
+        break;
+      case STRING:
+        propertySchema = StringSchema.builder();
+        break;
+      case BOOLEAN:
+        propertySchema = BooleanSchema.builder();
+        break;
+      case ARRAY:
+      case ITERABLE:
+        propertySchema =
+            ArraySchema.builder()
+                .allItemSchema(jsonPropertyFromBeamType(beamType.getCollectionElementType()));
+        break;
+      case ROW:
+        propertySchema = jsonSchemaBuilderFromBeamSchema(beamType.getRowSchema());
+        break;
+
+        // add more Beam to JSON types
+      default:
+        throw new IllegalArgumentException("Unsupported Beam to JSON type: " + beamType);
+    }
+
+    if (beamType.getNullable()) {
+      propertySchema = propertySchema.nullable(true);
+    }
+
+    return propertySchema.build();
+  }
+
   public static Schema beamSchemaFromJsonSchema(String jsonSchemaStr) {
     org.everit.json.schema.ObjectSchema jsonSchema = jsonSchemaFromString(jsonSchemaStr);
     return beamSchemaFromJsonSchema(jsonSchema);
   }
 
   private static Schema beamSchemaFromJsonSchema(org.everit.json.schema.ObjectSchema jsonSchema) {
     Schema.Builder beamSchemaBuilder = Schema.builder();
-    for (Map.Entry<String, org.everit.json.schema.Schema> entry :
-        jsonSchema.getPropertySchemas().entrySet()) {
+    Map<String, org.everit.json.schema.Schema> properties =
+        new HashMap<>(jsonSchema.getPropertySchemas());
+    // Properties in a JSON Schema are stored in a Map object and unfortunately don't maintain
+    // order. However, the schema's required properties is a list of property names that is
+    // consistent and is in the same order as when the schema was first created. To create a
+    // consistent Beam Schema from the same JSON schema, we add Schema Fields following this order.
+    // We can guarantee a consistent Beam schema when all JSON properties are required.
+    for (String propertyName : jsonSchema.getRequiredProperties()) {
+      org.everit.json.schema.Schema propertySchema = properties.get(propertyName);
+      if (propertySchema == null) {
+        throw new IllegalArgumentException("Unable to parse schema " + jsonSchema);
+      }
+
+      Boolean isNullable =
+          Boolean.TRUE.equals(propertySchema.getUnprocessedProperties().get("nullable"));
+      beamSchemaBuilder =
+          addPropertySchemaToBeamSchema(
+              propertyName, propertySchema, beamSchemaBuilder, isNullable);
+      // Remove properties we already added.
+      properties.remove(propertyName, propertySchema);
+    }
+
+    // Now we are potentially left with properties that are not required. Add them too.
+    // Note: having more than one non-required properties may result in  inconsistent
+    // Beam schema field orderings.

Review Comment:
   got it, LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1170364417


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern parameter must be set,"
+              + "but not both.");

Review Comment:
   No this is for all the read schematransforms. Basically there are two use-cases:
   
   1. read files that match **one** filepattern (ie. use a configuration field)
   2. read files that match **multiple** filepatterns (ie. use an input PCollection<String> of filepattern elements)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1170561474


##########
sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA;
+import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
+import org.apache.beam.sdk.extensions.avro.io.AvroIO;
+import org.apache.beam.sdk.extensions.avro.io.DynamicAvroDestinations;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class AvroReadSchemaTransformFormatProviderTest
+    extends FileReadSchemaTransformFormatProviderTest {
+
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  @Override
+  protected String getFormat() {
+    return new AvroReadSchemaTransformFormatProvider().identifier();
+  }
+
+  @Override
+  public String getStringSchemaFromBeamSchema(Schema beamSchema) {
+    return AvroUtils.toAvroSchema(beamSchema).toString();
+  }
+
+  @Override
+  public void runWriteAndReadTest(
+      Schema schema, List<Row> rows, String filePath, String schemaFilePath) {
+    org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
+    String stringSchema =
+        Strings.isNullOrEmpty(schemaFilePath) ? avroSchema.toString() : schemaFilePath;
+
+    writePipeline
+        .apply(Create.of(rows).withRowSchema(schema))
+        .apply(
+            MapElements.into(TypeDescriptor.of(GenericRecord.class))
+                .via(AvroUtils.getRowToGenericRecordFunction(avroSchema)))
+        .setCoder(AvroGenericCoder.of(avroSchema))
+        .apply(AvroIO.writeGenericRecords(avroSchema).to(filePath));

Review Comment:
   Added the tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25927: FileReadSchemaTransform

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25927:
URL: https://github.com/apache/beam/pull/25927#issuecomment-1492232314

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25927:
URL: https://github.com/apache/beam/pull/25927#issuecomment-1494772346

   R: @johnjcasey 
   R: @Abacn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25927:
URL: https://github.com/apache/beam/pull/25927#issuecomment-1505605751

   Run Spotless PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1166137663


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})

Review Comment:
   Could we remove the SuppressWarnings annotation and address the nulls that are creating the null checker warnings?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})

Review Comment:
   I git cloned this PR and tried these changes and was able to get a successful `./gradlew :sdks:java:io:file-schema-transform:check`. Would you consider to incorporating these changes?  I also provided comments at the lines of code what changes I was able to make to remove the null warnings without using the SuppressWarnings annotation.  I agree that we are applying `checkArgument` or `checkState` and why doesn't the checker pass kinds of questions. However, I hope that the following list could expedite the refactoring of this PR. (Note: Optional.isPresent seems to do the trick)
   
   1. Remove all SupressWarnings annotations
   2. Add a package private `getSafeSchema` method:
   
   ```
   // FYI these are the imports:
   // import java.util.Optional;
   // import org.checkerframework.checker.nullness.qual.NonNull;
   
    @NonNull
     String getSafeSchema() {
       Optional<String> schema = Optional.ofNullable(getSchema());
       checkState(schema.isPresent() && !schema.get().isEmpty());
       return schema.get();
     }
   ```
   
   3. Add a package private `getSafeFilepattern()`:
   
   ```
   // FYI these are the imports:
   // import java.util.Optional;
   // import org.checkerframework.checker.nullness.qual.NonNull;
     @NonNull
     String getSafeFilepattern() {
       Optional<String> value = Optional.ofNullable(getFilepattern());
       checkState(value.isPresent() && !value.get().isEmpty());
       return value.get();
     }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1174127902


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern parameter must be set,"
+              + "but not both.");

Review Comment:
   It makes sense now reading the Javadoc for the interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1179591064


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern parameter must be set,"
+              + "but not both.");
+
+      // Input schema can be a schema String or a path to a file containing the schema
+      // Resolve to get a schema String
+      String schema = configuration.getSchema();
+      if (!Strings.isNullOrEmpty(schema)) {
+        schema = resolveSchemaStringOrFilePath(schema);
+        configuration = configuration.toBuilder().setSchema(schema).build();
+      }
+
+      PCollection<MatchResult.Metadata> files;
+      if (!Strings.isNullOrEmpty(configuration.getFilepattern())) {
+        Pipeline p = input.getPipeline();
+        FileIO.Match matchFiles = FileIO.match().filepattern(configuration.getFilepattern());
+        // Handle streaming case
+        matchFiles = (FileIO.Match) maybeApplyStreaming(matchFiles);
+
+        files = p.apply(matchFiles);
+      } else {
+        FileIO.MatchAll matchAllFiles = FileIO.matchAll();
+        // Handle streaming case
+        matchAllFiles = (FileIO.MatchAll) maybeApplyStreaming(matchAllFiles);
+
+        files =
+            input
+                .get(INPUT_TAG)
+                .apply(
+                    "Get filepatterns",
+                    MapElements.into(TypeDescriptors.strings())
+                        .via((row) -> row.getString("filepattern")))
+                .apply("Match files", matchAllFiles);
+      }
+
+      // Pass readable files to the appropriate source and output rows.
+      PCollection<Row> output =
+          files
+              .apply(FileIO.readMatches())
+              .apply("Read files", getProvider().buildTransform(configuration));
+
+      return PCollectionRowTuple.of(OUTPUT_TAG, output);
+    }
+
+    public PTransform<?, PCollection<MatchResult.Metadata>> maybeApplyStreaming(
+        PTransform<?, PCollection<MatchResult.Metadata>> matchTransform) {
+      // Two parameters are provided to configure watching for new files.
+      Long terminateAfterSeconds = configuration.getTerminateAfterSecondsSinceNewOutput();
+      Long pollIntervalMillis = configuration.getPollIntervalMillis();
+
+      // Streaming is enabled when a poll interval is provided
+      if (pollIntervalMillis != null && pollIntervalMillis > 0L) {
+        Duration pollDuration = Duration.millis(pollIntervalMillis);
+
+        // By default, the file match transform never terminates
+        TerminationCondition<String, ?> terminationCondition = Growth.never();
+
+        // If provided, will terminate after this many seconds since seeing a new file
+        if (terminateAfterSeconds != null && terminateAfterSeconds > 0L) {
+          terminationCondition =
+              Growth.afterTimeSinceNewOutput(Duration.standardSeconds(terminateAfterSeconds));
+        }
+
+        // Apply watch for new files
+        if (matchTransform instanceof FileIO.Match) {
+          matchTransform =
+              ((FileIO.Match) matchTransform).continuously(pollDuration, terminationCondition);
+        } else if (matchTransform instanceof FileIO.MatchAll) {
+          matchTransform =
+              ((FileIO.MatchAll) matchTransform).continuously(pollDuration, terminationCondition);
+        }
+      }
+      return matchTransform;
+    }
+
+    public String resolveSchemaStringOrFilePath(String schema) {

Review Comment:
   I prefer keeping one field that does both, I think that would be simpler from the user's perspective. One field that can handle both use cases.
   
   Maybe the documentation for Schema isn't clear enough? I'll  try rewording it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1149210074


##########
sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java:
##########
@@ -270,6 +400,55 @@ public static SerializableFunction<Row, DoublyNestedDataTypes> doublyNestedDataT
     return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
   }
 
+  /**
+   * Contains all primitive Java types supported by Avro. The purpose of this class is to test
+   * schema-aware PTransforms with flat {@link Schema} {@link Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class AvroPrimitiveDataTypes implements Serializable {

Review Comment:
   Tried this and some formats [failed](https://ci-beam.apache.org/job/beam_PreCommit_Java_File-schema-transform_IO_Direct_Commit/1/testReport/org.apache.beam.sdk.io.fileschematransform/JsonFileWriteSchemaTransformFormatProviderTest/doublyNestedDataTypesRepeat/); looks like XML, JSON, CSV don't support bytes (as in a sequence of bytes), but XML and JSON do support a single byte type. Opposite to the case of Avro and Parquet, which support a byte sequence but not a single byte.
   
   Thinking of proceeding by taking not including any form of bytes in AllPrimitiveDataTypes since there is no common overlap. Instead, we can have two new classes, ByteType and ByteSequenceType that will test for byte, byte[]. I can create new tests for these in both file read and write transforms. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1167317861


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class FileReadSchemaTransformConfiguration {
+  static final Set<String> VALID_FORMATS = Sets.newHashSet("avro", "parquet", "json", "line");
+
+  public void validate() {
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getFormat()) && VALID_FORMATS.contains(this.getFormat()),
+        "A valid file format must be specified. Please specify one of: "
+            + VALID_FORMATS.toString());
+
+    if (!this.getFormat().equals("line")) {
+      checkArgument(
+          !Strings.isNullOrEmpty(this.getSchema()),
+          String.format(
+              "A schema must be specified when reading files with %s format. You may provide a schema string or a path to a file containing the schema.",
+              this.getFormat()));
+    }

Review Comment:
   I did an experiment that might help here. The experiment answers "Can we use a Schema as a field type in our SchemaTransform configurations?"  If yes, the implications of the answer in FileReadSchemaTransformConfiguration's context would be:
   
   ```
   abstract class FileReadSchemaTransformConfiguration {
       abstract Schema getSchema();
   }
   ```
   
   So far it looks like yes.  Here's what I did:
   
   #### 1. Define a simple AutoValue class
   
   <details>
   <summary>AutoValue Class with Schema field</summary>
   
   ```
   @DefaultSchema(AutoValueSchema.class)
   @AutoValue
   public abstract class ExampleModel {
   
     public static Builder builder() {
       return new AutoValue_ExampleModel.Builder();
     }
   
     public abstract String getAString();
   
     public abstract Schema getASchema();
   
     @AutoValue.Builder
     public abstract static class Builder {
   
       public abstract Builder setAString(String value);
   
       public abstract Builder setASchema(Schema value);
   
       public abstract ExampleModel build();
     }
   }
   ```
   </details>
   
   #### 2. Generate to/from Row functions from schema provider
   
   <details>
   <summary>SchemaProvider boilerplate</summary>
   
   ```
   private static final AutoValueSchema SCHEMA_PROVIDER = new AutoValueSchema();
   private static final TypeDescriptor<ExampleModel> TD = TypeDescriptor.of(ExampleModel.class);
   private static final SerializableFunction<ExampleModel, Row> TO_ROW_FN = SCHEMA_PROVIDER
         .toRowFunction(TD);
   private static final SerializableFunction<Row, ExampleModel> FROM_ROW_FN = SCHEMA_PROVIDER
         .fromRowFunction(TD);
   ```
   </details>
   
   #### 3. Instantiate the AutoValue class, setting the Schema field
   
   <details>
   <summary>Create schema then instantiate ExampleModel</summary>
   
   ```
   Schema schema = Schema.of(
       Field.of("anInteger", FieldType.INT32),
       Field.of("aBoolean", FieldType.BOOLEAN),
       Field.of("something", FieldType.STRING)
   );
   ExampleModel model = ExampleModel.builder()
       .setAString("hello")
       .setASchema(schema)
       .build();
   ```
   
   </details>
   
   #### 4. Convert ExampleModel to a Row
   
   This simulates what the SchemaProvider would receive as the configuration representation.
   
   ```
   Row row = TO_ROW_FN.apply(model);
   ```
   
   #### 5. Convert back to an ExampleModel from the Row
   
   ```
   ExampleModel backAgain = FROM_ROW_FN.apply(row);
   ```
   
   #### 6. Finally check whether the Schema fields were preserved during the conversions
   
   <details>
   <summary>Call getSchema() and run a bunch of checks</summary>
   
   ```
   Schema backAgainASchema = backAgain.getASchema();
   checkNotNull(backAgainASchema);
   checkState(backAgainASchema.hasField("anInteger"));
   checkState(backAgainASchema.hasField("aBoolean"));
   checkState(backAgainASchema.hasField("something"));
   checkState(backAgainASchema.getField("anInteger").getType().equals(FieldType.INT32));
   checkState(backAgainASchema.getField("aBoolean").getType().equals(FieldType.BOOLEAN));
   checkState(backAgainASchema.getField("something").getType().equals(FieldType.STRING));
   ```
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1167334554


##########
sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA;
+import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
+import org.apache.beam.sdk.extensions.avro.io.AvroIO;
+import org.apache.beam.sdk.extensions.avro.io.DynamicAvroDestinations;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class AvroReadSchemaTransformFormatProviderTest
+    extends FileReadSchemaTransformFormatProviderTest {
+
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  @Override
+  protected String getFormat() {
+    return new AvroReadSchemaTransformFormatProvider().identifier();
+  }
+
+  @Override
+  public String getStringSchemaFromBeamSchema(Schema beamSchema) {
+    return AvroUtils.toAvroSchema(beamSchema).toString();
+  }
+
+  @Override
+  public void runWriteAndReadTest(
+      Schema schema, List<Row> rows, String filePath, String schemaFilePath) {
+    org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
+    String stringSchema =
+        Strings.isNullOrEmpty(schemaFilePath) ? avroSchema.toString() : schemaFilePath;
+
+    writePipeline
+        .apply(Create.of(rows).withRowSchema(schema))
+        .apply(
+            MapElements.into(TypeDescriptor.of(GenericRecord.class))
+                .via(AvroUtils.getRowToGenericRecordFunction(avroSchema)))
+        .setCoder(AvroGenericCoder.of(avroSchema))
+        .apply(AvroIO.writeGenericRecords(avroSchema).to(filePath));

Review Comment:
   Should we test whether files written using FileWriteSchemaTransformProvider read from FileReadSchemaTransformProvider?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1167333720


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JsonUtils.java:
##########
@@ -128,50 +134,151 @@ public String apply(Row input) {
     };
   }
 
+  public static String jsonSchemaStringFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaFromBeamSchema(beamSchema).toString();
+  }
+
+  public static ObjectSchema jsonSchemaFromBeamSchema(Schema beamSchema) {
+    return jsonSchemaBuilderFromBeamSchema(beamSchema).build();
+  }
+
+  private static ObjectSchema.Builder jsonSchemaBuilderFromBeamSchema(Schema beamSchema) {
+    // Beam Schema is strict, so we should not accept additional properties
+    ObjectSchema.Builder jsonSchemaBuilder = ObjectSchema.builder().additionalProperties(false);
+
+    for (Field field : beamSchema.getFields()) {
+      String name = field.getName();
+      org.everit.json.schema.Schema propertySchema = jsonPropertyFromBeamType(field.getType());
+
+      // Add property and make it required
+      jsonSchemaBuilder = jsonSchemaBuilder.addPropertySchema(name, propertySchema);
+      jsonSchemaBuilder.addRequiredProperty(name);
+    }
+
+    return jsonSchemaBuilder;
+  }
+
+  private static org.everit.json.schema.Schema jsonPropertyFromBeamType(FieldType beamType) {
+    org.everit.json.schema.Schema.Builder<? extends org.everit.json.schema.Schema> propertySchema;
+
+    switch (beamType.getTypeName()) {
+      case BYTE:
+      case INT16:
+      case INT32:
+      case INT64:
+        propertySchema = NumberSchema.builder().requiresInteger(true);
+        break;
+      case DECIMAL:
+      case FLOAT:
+      case DOUBLE:

Review Comment:
   Same as https://github.com/apache/beam/pull/25927/files?file-filters%5B%5D=.java&show-viewed-files=false#r1167333670



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1170425017


##########
sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.ALL_PRIMITIVE_DATA_TYPES_SCHEMA;
+import static org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformFormatProviderTestData.DATA;
+import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
+import org.apache.beam.sdk.extensions.avro.io.AvroIO;
+import org.apache.beam.sdk.extensions.avro.io.DynamicAvroDestinations;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class AvroReadSchemaTransformFormatProviderTest
+    extends FileReadSchemaTransformFormatProviderTest {
+
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  @Override
+  protected String getFormat() {
+    return new AvroReadSchemaTransformFormatProvider().identifier();
+  }
+
+  @Override
+  public String getStringSchemaFromBeamSchema(Schema beamSchema) {
+    return AvroUtils.toAvroSchema(beamSchema).toString();
+  }
+
+  @Override
+  public void runWriteAndReadTest(
+      Schema schema, List<Row> rows, String filePath, String schemaFilePath) {
+    org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
+    String stringSchema =
+        Strings.isNullOrEmpty(schemaFilePath) ? avroSchema.toString() : schemaFilePath;
+
+    writePipeline
+        .apply(Create.of(rows).withRowSchema(schema))
+        .apply(
+            MapElements.into(TypeDescriptor.of(GenericRecord.class))
+                .via(AvroUtils.getRowToGenericRecordFunction(avroSchema)))
+        .setCoder(AvroGenericCoder.of(avroSchema))
+        .apply(AvroIO.writeGenericRecords(avroSchema).to(filePath));

Review Comment:
   Yeah probably worth doing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1147807659


##########
sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java:
##########
@@ -270,6 +400,55 @@ public static SerializableFunction<Row, DoublyNestedDataTypes> doublyNestedDataT
     return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
   }
 
+  /**
+   * Contains all primitive Java types supported by Avro. The purpose of this class is to test
+   * schema-aware PTransforms with flat {@link Schema} {@link Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class AvroPrimitiveDataTypes implements Serializable {

Review Comment:
   If you need my help refactoring tests that break as a result of removing Short and Byte, please let me know and I can commit to this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1147523858


##########
sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java:
##########
@@ -270,6 +400,55 @@ public static SerializableFunction<Row, DoublyNestedDataTypes> doublyNestedDataT
     return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
   }
 
+  /**
+   * Contains all primitive Java types supported by Avro. The purpose of this class is to test
+   * schema-aware PTransforms with flat {@link Schema} {@link Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class AvroPrimitiveDataTypes implements Serializable {

Review Comment:
   Yeah the problem is Short (int16)  and Byte (a single Byte) are not types that are supported by Avro (see [Avro primitive types](https://docs.oracle.com/cd/E26161_02/html/GettingStartedGuide/avroschemas.html#:~:text=in%20this%20chapter.-,Primitive%20Data%20Types,-In%20the%20previous)) or Parquet (see [primitive types](https://parquet.apache.org/docs/file-format/types/)). So actually in AvroUtils we turn Short and Byte types into Beam Schema types to an Integer (int32).
   
   Avro and Parquet do support a sequence of bytes though, so I've replaced Byte with ByteBuffer in `AvroPrimitiveDataTypes`.
   
   >Another alternative would be just to get rid of the Byte field in AllPrimitiveDataTypes
   
   If you agree, I'd be happy to remove Short and replace Byte with ByteBuffer in `AllPrimitiveDataTypes` and use that for everything instead (for both read and write). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1149345996


##########
sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/SchemaAwareJavaBeans.java:
##########
@@ -270,6 +400,55 @@ public static SerializableFunction<Row, DoublyNestedDataTypes> doublyNestedDataT
     return DEFAULT_SCHEMA_PROVIDER.fromRowFunction(DOUBLY_NESTED_DATA_TYPES_TYPE_DESCRIPTOR);
   }
 
+  /**
+   * Contains all primitive Java types supported by Avro. The purpose of this class is to test
+   * schema-aware PTransforms with flat {@link Schema} {@link Row}s.
+   */
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class AvroPrimitiveDataTypes implements Serializable {

Review Comment:
   Thinking of proceeding by not including any form of bytes in AllPrimitiveDataTypes since there is no common overlap. Instead, we can have two new classes, ByteType and ByteSequenceType that will test for byte, byte[]. I can create new tests for these in both file read and write transforms.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1182769551


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JsonSchemaConversionTest.java:
##########
@@ -20,19 +20,167 @@
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.utils.JsonUtils;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
+import org.everit.json.schema.ValidationException;
+import org.json.JSONArray;
+import org.json.JSONObject;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class JsonSchemaConversionTest {
+  @Test
+  public void testBeamSchemaToJsonSchemaValidatesPrimitives() {
+    Schema schema =
+        Schema.builder()
+            .addBooleanField("booleanProp")
+            .addInt32Field("integerProp")
+            .addFloatField("floatProp")
+            .addStringField("stringProp")
+            .addByteField("byteProp")
+            .build();
+
+    JSONObject obj =
+        new JSONObject()
+            .put("booleanProp", true)
+            .put("integerProp", 1)
+            .put("floatProp", 1.1)
+            .put("stringProp", "a")
+            .put("byteProp", (byte) 1);
+
+    JsonUtils.jsonSchemaFromBeamSchema(schema).validate(obj);
+  }
+
+  @Test
+  public void testBeamSchemaToJsonSchemaValidatesNullablePrimitives() {
+    Schema schema =
+        Schema.builder()
+            .addNullableBooleanField("booleanProp")
+            .addNullableInt32Field("integerProp")
+            .addNullableFloatField("floatProp")
+            .addNullableStringField("stringProp")
+            .addNullableByteField("byteProp")
+            .build();
+
+    JSONObject obj =
+        new JSONObject()
+            .put("booleanProp", JSONObject.NULL)
+            .put("integerProp", JSONObject.NULL)
+            .put("floatProp", JSONObject.NULL)
+            .put("stringProp", JSONObject.NULL)
+            .put("byteProp", JSONObject.NULL);
+
+    JsonUtils.jsonSchemaFromBeamSchema(schema).validate(obj);
+  }
+
+  @Test
+  public void testBeamSchemaToJsonSchemaValidatesArrayPrimitives() {
+    Schema schema =
+        Schema.builder()
+            .addArrayField("booleanArray", FieldType.BOOLEAN)
+            .addArrayField("integerArray", FieldType.INT32)
+            .addArrayField("floatArray", FieldType.FLOAT)
+            .addArrayField("stringArray", FieldType.STRING)
+            .addArrayField("byteArray", FieldType.BYTE)
+            .build();
+
+    JSONObject obj =
+        new JSONObject()
+            .put("booleanArray", new JSONArray().put(true).put(false))
+            .put("integerArray", new JSONArray().put(1).put(2).put(3))
+            .put("floatArray", new JSONArray().put(1.1).put(2.2).put(3.3))
+            .put("stringArray", new JSONArray().put("a").put("b").put("c"))
+            .put("byteArray", new JSONArray().put((byte) 1).put((byte) 2).put((byte) 3));
+
+    JsonUtils.jsonSchemaFromBeamSchema(schema).validate(obj);
+  }
+
+  @Test
+  public void testBeamSchemaToJsonSchemaValidatesNestedPrimitives() {
+    Schema nestedSchema =
+        Schema.builder()
+            .addBooleanField("booleanProp")
+            .addInt32Field("integerProp")
+            .addFloatField("floatProp")
+            .addStringField("stringProp")
+            .addByteField("byteProp")
+            .build();
+    Schema schema =
+        Schema.builder()
+            .addRowField("nestedObj", nestedSchema)
+            .addArrayField("nestedRepeated", FieldType.row(nestedSchema))
+            .build();
+
+    JSONObject nestedObj =
+        new JSONObject()
+            .put("booleanProp", true)
+            .put("integerProp", 1)
+            .put("floatProp", 1.1)
+            .put("stringProp", "a")
+            .put("byteProp", (byte) 1);
+    JSONObject obj =
+        new JSONObject()
+            .put("nestedObj", nestedObj)
+            .put("nestedRepeated", new JSONArray().put(nestedObj).put(nestedObj));
+
+    JsonUtils.jsonSchemaFromBeamSchema(schema).validate(obj);
+  }
+
+  @Test
+  public void testBeamSchemaToJsonSchemaValidationFailsWithMissingProperties() {
+    Schema schema =
+        Schema.builder().addInt32Field("integerProp").addStringField("stringProp").build();
+
+    JSONObject obj = new JSONObject().put("integerProp", 1);
+
+    try {
+      JsonUtils.jsonSchemaFromBeamSchema(schema).validate(obj);
+      throw new RuntimeException("Did not throw validation exception for missing properties");
+    } catch (ValidationException e) {
+      assertEquals("#: required key [stringProp] not found", e.getMessage());
+    }

Review Comment:
   Non blocking and just FYI:
   
   ```
   ValidationException e = assertThrows(ValidationException.class, () -> JsonUtils.jsonSchemaFromBeamSchema(schema).validate(obj));
   
   assertEquals("#: required key [stringProp] not found", e.getMessage());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org