You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "damondouglas (via GitHub)" <gi...@apache.org> on 2023/04/14 00:19:41 UTC

[GitHub] [beam] damondouglas commented on a diff in pull request #25927: FileReadSchemaTransform

damondouglas commented on code in PR #25927:
URL: https://github.com/apache/beam/pull/25927#discussion_r1166137111


##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class FileReadSchemaTransformConfiguration {
+  static final Set<String> VALID_FORMATS = Sets.newHashSet("avro", "parquet", "json", "line");
+
+  public void validate() {
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getFormat()) && VALID_FORMATS.contains(this.getFormat()),
+        "A valid file format must be specified. Please specify one of: "
+            + VALID_FORMATS.toString());

Review Comment:
   Do you like the idea of this instead?  That was if a new FileReadSchemaTransformFormatProvider AutoService annotated implementation is added there's no need to syncronize with the VALID_FORMATS list.
   ```
   Providers.loadProviders(FileReadSchemaTransformFormatProvider.class).containsKey(this.getFormat())
   ```



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class FileReadSchemaTransformConfiguration {
+  static final Set<String> VALID_FORMATS = Sets.newHashSet("avro", "parquet", "json", "line");
+
+  public void validate() {
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getFormat()) && VALID_FORMATS.contains(this.getFormat()),
+        "A valid file format must be specified. Please specify one of: "
+            + VALID_FORMATS.toString());
+
+    if (!this.getFormat().equals("line")) {
+      checkArgument(
+          !Strings.isNullOrEmpty(this.getSchema()),
+          String.format(
+              "A schema must be specified when reading files with %s format. You may provide a schema string or a path to a file containing the schema.",
+              this.getFormat()));
+    }

Review Comment:
   I'm struggling with the need for someone to look into this code to determine that schema is required for all of the formats except for line.  Neither of the following options are 100% ideal and I trust your judgement but wanted to relay my thoughts.
   
   1) Remove LineReadSchemaTransformFormatProvider and the Nullable annotation on the getSchema method
   2) Remove the Nullable annotation on the getSchema method and provide/document a Schema for LineReadSchemaTransformFormatProvider
   
   What's missing from option 1 and 2 is dealing with a non-null empty string. I'd have to test this on my end but I wonder if Schema is one of the rare non-primitive types that we can get away with.



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})

Review Comment:
   Is there a way to apply nullness warnings suppression at the methods that are giving the problem? It's more code but alternatively do you like the idea of using Optional?  In the configuration I don't imagine a performance hit as it is evaluated at pipeline construction time.



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.io.Providers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Watch.Growth;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class FileReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<FileReadSchemaTransformConfiguration> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileReadSchemaTransformProvider.class);
+  private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:file_read:v1";
+  static final String INPUT_TAG = "input";
+  static final String OUTPUT_TAG = "output";
+
+  @Override
+  protected Class<FileReadSchemaTransformConfiguration> configurationClass() {
+    return FileReadSchemaTransformConfiguration.class;
+  }
+
+  @Override
+  protected SchemaTransform from(FileReadSchemaTransformConfiguration configuration) {
+    return new FileReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return IDENTIFIER;
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  private static class FileReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform {
+    private FileReadSchemaTransformConfiguration configuration;
+
+    FileReadSchemaTransform(FileReadSchemaTransformConfiguration configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkArgument(
+          input.getAll().isEmpty() ^ Strings.isNullOrEmpty(configuration.getFilepattern()),
+          "Either an input PCollection of file patterns or the filepattern parameter must be set,"
+              + "but not both.");

Review Comment:
   I had a hard time understanding this. Is this because we have the LineReadSchemaTransformFormatProvider?



##########
sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fileschematransform;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class FileReadSchemaTransformConfiguration {
+  static final Set<String> VALID_FORMATS = Sets.newHashSet("avro", "parquet", "json", "line");
+
+  public void validate() {
+    checkArgument(
+        !Strings.isNullOrEmpty(this.getFormat()) && VALID_FORMATS.contains(this.getFormat()),
+        "A valid file format must be specified. Please specify one of: "
+            + VALID_FORMATS.toString());
+
+    if (!this.getFormat().equals("line")) {
+      checkArgument(
+          !Strings.isNullOrEmpty(this.getSchema()),
+          String.format(
+              "A schema must be specified when reading files with %s format. You may provide a schema string or a path to a file containing the schema.",
+              this.getFormat()));
+    }
+
+    Integer terminateAfterSecondsSinceNewOutput = this.getTerminateAfterSecondsSinceNewOutput();
+    Integer pollIntervalMillis = this.getPollIntervalMillis();
+    if (terminateAfterSecondsSinceNewOutput != null && terminateAfterSecondsSinceNewOutput > 0) {
+      checkArgument(
+          pollIntervalMillis != null && pollIntervalMillis > 0,
+          "Found positive value for terminateAfterSecondsSinceNewOutput but non-positive"
+              + "value for pollIntervalMillis. Please set pollIntervalMillis as well to enable"
+              + "streaming.");
+    }
+  }
+
+  public static Builder builder() {
+    return new AutoValue_FileReadSchemaTransformConfiguration.Builder();
+  }
+
+  /**
+   * The format of the file(s) to read.
+   *
+   * <p>Possible values are: `"lines"`, `"avro"`, `"parquet"`, `"json"`
+   */
+  public abstract String getFormat();
+
+  /**
+   * The filepattern used to match and read files.
+   *
+   * <p>May instead use an input PCollection<Row> of filepatterns.
+   */
+  @Nullable
+  public abstract String getFilepattern();
+
+  /**
+   * The schema used by sources to deserialize data and create Beam Rows.
+   *
+   * <p>May be provided as a schema String or as a String path to a file that contains the schema.
+   */
+  @Nullable
+  public abstract String getSchema();
+
+  /**
+   * The time, in milliseconds, to wait before polling for new files.
+   *
+   * <p>This will set the pipeline to be a streaming pipeline and will continuously watch for new
+   * files.
+   *
+   * <p>Note: This only polls for new files. New updates to an existing file will not be watched
+   * for.
+   */
+  @Nullable
+  public abstract Integer getPollIntervalMillis();

Review Comment:
   Should this be a Long so that it is easy to pass into [Duration](https://www.joda.org/joda-time/apidocs/org/joda/time/Duration.html#Duration-long-)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org