You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/26 17:27:17 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #12645: [BEAM-10124] Add ContextualTextIO

lukecwik commented on a change in pull request #12645:
URL: https://github.com/apache/beam/pull/12645#discussion_r477459276



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
##########
@@ -472,9 +472,9 @@ public static MatchConfiguration create(EmptyMatchTreatment emptyMatchTreatment)
           .build();
     }
 
-    abstract EmptyMatchTreatment getEmptyMatchTreatment();
+    public abstract EmptyMatchTreatment getEmptyMatchTreatment();
 
-    abstract @Nullable Duration getWatchInterval();
+    public abstract @Nullable Duration getWatchInterval();
 
     abstract @Nullable TerminationCondition<String, ?> getWatchTerminationCondition();

Review comment:
       might as well and make this public too

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/RecordWithMetadata.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Helper Class based on {@link AutoValueSchema}, it provides Metadata associated with each Record
+ * when reading from file(s) using {@link ContextualTextIO}.
+ *
+ * <h3>Fields:</h3>
+ *
+ * <ul>
+ *   <li>recordOffset: The offset of a record (the byte at which the record begins) in a file. This
+ *       information can be useful if you wish to reconstruct the file. {@link
+ *       RecordWithMetadata#getRecordOffset()}
+ *   <li>recordNum: The ordinal number of the record in its file. {@link
+ *       RecordWithMetadata#getRecordNum()}
+ *   <li>recordValue: The value / contents of the record {@link RecordWithMetadata#getRecordValue()}
+ *   <li>rangeOffset: The starting offset of the range (split), which contained the record, when the
+ *       record was read. {@link RecordWithMetadata#getRangeOffset()}
+ *   <li>recordNumInOffset: The record number relative to the Range. (line number within the range)
+ *       {@link RecordWithMetadata#getRecordNumInOffset()}
+ *   <li>fileName: Name of the file to which the record belongs (this is the full filename,
+ *       eg:path/to/file.txt) {@link RecordWithMetadata#getFileName()}
+ * </ul>
+ */
+@Experimental(Experimental.Kind.SCHEMAS)
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class RecordWithMetadata {

Review comment:
       Should this be RecordWithMetadata<T>?

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/RecordWithMetadata.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Helper Class based on {@link AutoValueSchema}, it provides Metadata associated with each Record
+ * when reading from file(s) using {@link ContextualTextIO}.
+ *
+ * <h3>Fields:</h3>
+ *
+ * <ul>
+ *   <li>recordOffset: The offset of a record (the byte at which the record begins) in a file. This
+ *       information can be useful if you wish to reconstruct the file. {@link
+ *       RecordWithMetadata#getRecordOffset()}
+ *   <li>recordNum: The ordinal number of the record in its file. {@link
+ *       RecordWithMetadata#getRecordNum()}
+ *   <li>recordValue: The value / contents of the record {@link RecordWithMetadata#getRecordValue()}
+ *   <li>rangeOffset: The starting offset of the range (split), which contained the record, when the
+ *       record was read. {@link RecordWithMetadata#getRangeOffset()}
+ *   <li>recordNumInOffset: The record number relative to the Range. (line number within the range)
+ *       {@link RecordWithMetadata#getRecordNumInOffset()}
+ *   <li>fileName: Name of the file to which the record belongs (this is the full filename,
+ *       eg:path/to/file.txt) {@link RecordWithMetadata#getFileName()}
+ * </ul>
+ */
+@Experimental(Experimental.Kind.SCHEMAS)
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class RecordWithMetadata {
+  public abstract Long getRecordOffset();

Review comment:
       please use `long` instead of `Long` since we don't expect any of these `Long` fields to be nullable

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/RecordWithMetadata.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Helper Class based on {@link AutoValueSchema}, it provides Metadata associated with each Record
+ * when reading from file(s) using {@link ContextualTextIO}.
+ *
+ * <h3>Fields:</h3>
+ *
+ * <ul>
+ *   <li>recordOffset: The offset of a record (the byte at which the record begins) in a file. This
+ *       information can be useful if you wish to reconstruct the file. {@link
+ *       RecordWithMetadata#getRecordOffset()}
+ *   <li>recordNum: The ordinal number of the record in its file. {@link
+ *       RecordWithMetadata#getRecordNum()}
+ *   <li>recordValue: The value / contents of the record {@link RecordWithMetadata#getRecordValue()}
+ *   <li>rangeOffset: The starting offset of the range (split), which contained the record, when the
+ *       record was read. {@link RecordWithMetadata#getRangeOffset()}
+ *   <li>recordNumInOffset: The record number relative to the Range. (line number within the range)
+ *       {@link RecordWithMetadata#getRecordNumInOffset()}
+ *   <li>fileName: Name of the file to which the record belongs (this is the full filename,
+ *       eg:path/to/file.txt) {@link RecordWithMetadata#getFileName()}
+ * </ul>
+ */
+@Experimental(Experimental.Kind.SCHEMAS)
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class RecordWithMetadata {
+  public abstract Long getRecordOffset();
+
+  public abstract Long getRecordNum();
+
+  public abstract String getRecordValue();
+
+  public abstract Long getRangeOffset();
+
+  public abstract Long getRecordNumInOffset();
+
+  public abstract Builder toBuilder();
+
+  public abstract String getFileName();

Review comment:
       We should be using ResourceId here.

##########
File path: sdks/java/io/contextual-text-io/build.gradle
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+        automaticModuleName: 'org.apache.beam.sdk.io.contextual-text-io',
+        enableChecker: false,
+        ignoreRawtypeErrors: true)
+
+description = "Apache Beam :: SDKs :: Java :: Contextual-Text-IO"
+ext.summary = "Context-aware Text IO."
+
+dependencies {
+

Review comment:
       ```suggestion
   ```

##########
File path: sdks/java/io/contextual-text-io/build.gradle
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+        automaticModuleName: 'org.apache.beam.sdk.io.contextual-text-io',
+        enableChecker: false,
+        ignoreRawtypeErrors: true)
+
+description = "Apache Beam :: SDKs :: Java :: Contextual-Text-IO"
+ext.summary = "Context-aware Text IO."
+
+dependencies {
+
+    compile library.java.vendored_guava_26_0_jre
+    compile library.java.protobuf_java
+    compile project(path: ":sdks:java:core", configuration: "shadow")
+    testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
+
+    testCompile library.java.guava_testlib
+    testCompile library.java.junit
+    testCompile library.java.hamcrest_core
+    testRuntimeOnly library.java.slf4j_jdk14
+    testCompile project(path: ":runners:direct-java", configuration: "shadow")
+
+}

Review comment:
       ```suggestion
   }
   ```

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIOSource.java
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation detail of {@link ContextualTextIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} which can decode records delimited by newline characters.
+ *
+ * <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or {@code
+ * \r\n} as the delimiter. This source is not strict and supports decoding the last record even if
+ * it is not delimited. Finally, no records are decoded if the stream is empty.
+ *
+ * <p>This source supports reading from any arbitrary byte position within the stream. If the
+ * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found
+ * representing the beginning of the first record to be decoded.
+ */
+@VisibleForTesting
+class ContextualTextIOSource extends FileBasedSource<RecordWithMetadata> {

Review comment:
       We should have made this a splittable DoFn.

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIOSource.java
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation detail of {@link ContextualTextIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} which can decode records delimited by newline characters.
+ *
+ * <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or {@code
+ * \r\n} as the delimiter. This source is not strict and supports decoding the last record even if
+ * it is not delimited. Finally, no records are decoded if the stream is empty.
+ *
+ * <p>This source supports reading from any arbitrary byte position within the stream. If the
+ * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found
+ * representing the beginning of the first record to be decoded.
+ */
+@VisibleForTesting
+class ContextualTextIOSource extends FileBasedSource<RecordWithMetadata> {
+  byte[] delimiter;
+
+  private static final Logger LOG = LoggerFactory.getLogger(ContextualTextIOSource.class);
+
+  // Used to Override isSplittable
+  private boolean hasMultilineCSVRecords;
+
+  @Override
+  protected boolean isSplittable() throws Exception {
+    if (hasMultilineCSVRecords) {
+      // When Having Multiline CSV Records,
+      // Splitting the file may cause a split to be within a record,
+      // Disabling split prevents this from happening
+      return false;
+    }
+    return super.isSplittable();
+  }
+
+  ContextualTextIOSource(
+      ValueProvider<String> fileSpec,
+      EmptyMatchTreatment emptyMatchTreatment,
+      byte[] delimiter,
+      boolean hasMultilineCSVRecords) {
+    super(fileSpec, emptyMatchTreatment, 1L);
+    this.delimiter = delimiter;
+    this.hasMultilineCSVRecords = hasMultilineCSVRecords;
+  }
+
+  private ContextualTextIOSource(
+      MatchResult.Metadata metadata,
+      long start,
+      long end,
+      byte[] delimiter,
+      boolean hasMultilineCSVRecords) {
+    super(metadata, 1L, start, end);
+    this.delimiter = delimiter;
+    this.hasMultilineCSVRecords = hasMultilineCSVRecords;
+  }
+
+  @Override
+  protected FileBasedSource<RecordWithMetadata> createForSubrangeOfFile(
+      MatchResult.Metadata metadata, long start, long end) {
+    return new ContextualTextIOSource(metadata, start, end, delimiter, hasMultilineCSVRecords);
+  }
+
+  @Override
+  protected FileBasedReader<RecordWithMetadata> createSingleFileReader(PipelineOptions options) {
+    return new MultiLineTextBasedReader(this, delimiter, hasMultilineCSVRecords);
+  }
+
+  @Override
+  public Coder<RecordWithMetadata> getOutputCoder() {
+    SchemaCoder<RecordWithMetadata> coder = null;
+    try {
+      coder = SchemaRegistry.createDefault().getSchemaCoder(RecordWithMetadata.class);
+    } catch (NoSuchSchemaException e) {
+      LOG.error("No Coder Found for RecordWithMetadata");
+    }
+    return coder;
+  }
+
+  /**
+   * A {@link FileBasedReader FileBasedReader} which can decode records delimited by delimiter
+   * characters.
+   *
+   * <p>See {@link ContextualTextIOSource } for further details.
+   */
+  @VisibleForTesting
+  static class MultiLineTextBasedReader extends FileBasedReader<RecordWithMetadata> {
+    public static final int READ_BUFFER_SIZE = 8192;
+    private static final ByteString UTF8_BOM =
+        ByteString.copyFrom(new byte[] {(byte) 0xEF, (byte) 0xBB, (byte) 0xBF});
+    private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
+    private ByteString buffer;
+    private int startOfDelimiterInBuffer;
+    private int endOfDelimiterInBuffer;
+    private long startOfRecord;
+    private volatile long startOfNextRecord;
+    private volatile boolean eof;
+    private volatile boolean elementIsPresent;
+    private @Nullable RecordWithMetadata currentValue;
+    private @Nullable ReadableByteChannel inChannel;
+    private byte @Nullable [] delimiter;
+
+    // Add to override the isSplittable
+    private boolean hasMultilineCSVRecords;
+
+    private long startingOffset;
+    private long totalRecordCount;
+
+    private MultiLineTextBasedReader(
+        ContextualTextIOSource source, byte[] delimiter, boolean hasMultilineCSVRecords) {
+      super(source);
+      buffer = ByteString.EMPTY;
+      this.delimiter = delimiter;
+      this.hasMultilineCSVRecords = hasMultilineCSVRecords;
+      startingOffset = getCurrentSource().getStartOffset(); // Start offset;
+    }
+
+    @Override
+    protected long getCurrentOffset() throws NoSuchElementException {
+      if (!elementIsPresent) {
+        throw new NoSuchElementException();
+      }
+      return startOfRecord;
+    }
+
+    @Override
+    public long getSplitPointsRemaining() {
+      if (isStarted() && startOfNextRecord >= getCurrentSource().getEndOffset()) {
+        return isDone() ? 0 : 1;
+      }
+      return super.getSplitPointsRemaining();
+    }
+
+    @Override
+    public RecordWithMetadata getCurrent() throws NoSuchElementException {
+      if (!elementIsPresent) {
+        throw new NoSuchElementException();
+      }
+      return currentValue;
+    }
+
+    @Override
+    protected void startReading(ReadableByteChannel channel) throws IOException {
+      this.inChannel = channel;
+      // If the first offset is greater than zero, we need to skip bytes until we see our
+      // first delimiter.
+      long startOffset = getCurrentSource().getStartOffset();
+      if (startOffset > 0) {
+        Preconditions.checkState(
+            channel instanceof SeekableByteChannel,
+            "%s only supports reading from a SeekableByteChannel when given a start offset"
+                + " greater than 0.",
+            ContextualTextIOSource.class.getSimpleName());
+        long requiredPosition = startOffset - 1;
+        if (delimiter != null && startOffset >= delimiter.length) {
+          // we need to move back the offset of at worse delimiter.size to be sure to see
+          // all the bytes of the delimiter in the call to findDelimiterBounds() below
+          requiredPosition = startOffset - delimiter.length;
+        }
+        ((SeekableByteChannel) channel).position(requiredPosition);
+        findDelimiterBoundsWithMultiLineCheck();
+        buffer = buffer.substring(endOfDelimiterInBuffer);
+        startOfNextRecord = requiredPosition + endOfDelimiterInBuffer;
+        endOfDelimiterInBuffer = 0;
+        startOfDelimiterInBuffer = 0;
+      }
+    }
+
+    private void findDelimiterBoundsWithMultiLineCheck() throws IOException {
+      findDelimiterBounds();
+    }
+
+    /**
+     * Locates the start position and end position of the next delimiter. Will consume the channel
+     * till either EOF or the delimiter bounds are found.
+     *
+     * <p>If {@link ContextualTextIOSource#hasMultilineCSVRecords} is set then the behaviour will
+     * change from the standard read seen in {@link org.apache.beam.sdk.io.TextIO}. The assumption
+     * when {@link ContextualTextIOSource#hasMultilineCSVRecords} is set is that the file is being
+     * read with a single thread.
+     *
+     * <p>This fills the buffer and updates the positions as follows:
+     *
+     * <pre>{@code
+     * ------------------------------------------------------
+     * | element bytes | delimiter bytes | unconsumed bytes |
+     * ------------------------------------------------------
+     * 0            start of          end of              buffer
+     *              delimiter         delimiter           size
+     *              in buffer         in buffer
+     * }</pre>
+     */
+    private void findDelimiterBounds() throws IOException {
+      int bytePositionInBuffer = 0;
+      boolean doubleQuoteClosed = true;
+
+      while (true) {
+        if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) {
+          startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer;
+          break;
+        }
+
+        byte currentByte = buffer.byteAt(bytePositionInBuffer);
+        if (hasMultilineCSVRecords) {
+          // Check if we are inside an open Quote
+          if (currentByte == '"') {
+            doubleQuoteClosed = !doubleQuoteClosed;
+          }
+        } else {
+          doubleQuoteClosed = true;
+        }
+
+        if (delimiter == null) {
+          // default delimiter
+          if (currentByte == '\n') {
+            startOfDelimiterInBuffer = bytePositionInBuffer;
+            endOfDelimiterInBuffer = startOfDelimiterInBuffer + 1;
+            if (doubleQuoteClosed) {
+              break;
+            }
+          } else if (currentByte == '\r') {
+            startOfDelimiterInBuffer = bytePositionInBuffer;
+            endOfDelimiterInBuffer = startOfDelimiterInBuffer + 1;
+            if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
+              currentByte = buffer.byteAt(bytePositionInBuffer + 1);
+              if (currentByte == '\n') {
+                endOfDelimiterInBuffer += 1;
+              }
+            }
+            if (doubleQuoteClosed) {
+              break;
+            }
+          }
+        } else {
+          // when the user defines a delimiter
+          int i = 0;
+          startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer;
+          while ((i < delimiter.length) && (currentByte == delimiter[i])) {
+            // read next byte;
+            i++;
+            if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + i + 1)) {
+              currentByte = buffer.byteAt(bytePositionInBuffer + i);
+            } else {
+              // corner case: delimiter truncate at the end of file
+              startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer;
+              break;
+            }
+          }
+          if (i == delimiter.length) {
+            endOfDelimiterInBuffer = bytePositionInBuffer + i;
+            if (doubleQuoteClosed) {
+              break;
+            }
+          }
+        }
+        bytePositionInBuffer += 1;
+      }
+    }
+
+    @Override
+    protected boolean readNextRecord() throws IOException {
+      startOfRecord = startOfNextRecord;
+
+      findDelimiterBoundsWithMultiLineCheck();
+
+      // If we have reached EOF file and consumed all of the buffer then we know
+      // that there are no more records.
+      if (eof && buffer.isEmpty()) {
+        elementIsPresent = false;
+        return false;
+      }
+
+      decodeCurrentElement();
+      startOfNextRecord = startOfRecord + endOfDelimiterInBuffer;
+      return true;
+    }
+
+    /**
+     * Decodes the current element updating the buffer to only contain the unconsumed bytes.
+     *
+     * <p>This invalidates the currently stored {@code startOfDelimiterInBuffer} and {@code
+     * endOfDelimiterInBuffer}.
+     */
+    private void decodeCurrentElement() throws IOException {
+      ByteString dataToDecode = buffer.substring(0, startOfDelimiterInBuffer);
+      // If present, the UTF8 Byte Order Mark (BOM) will be removed.
+      if (startOfRecord == 0 && dataToDecode.startsWith(UTF8_BOM)) {
+        dataToDecode = dataToDecode.substring(UTF8_BOM.size());
+      }
+
+      // The line num is:
+      Long recordUniqueNum = totalRecordCount++;
+      // The Complete FileName (with uri if this is a web url eg: temp/abc.txt) is:
+      String fileName = getCurrentSource().getSingleFileMetadata().resourceId().toString();

Review comment:
       We should be passing the resourceId through and not pulling out the filename by itself.

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/RecordWithMetadata.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Helper Class based on {@link AutoValueSchema}, it provides Metadata associated with each Record
+ * when reading from file(s) using {@link ContextualTextIO}.
+ *
+ * <h3>Fields:</h3>
+ *
+ * <ul>
+ *   <li>recordOffset: The offset of a record (the byte at which the record begins) in a file. This
+ *       information can be useful if you wish to reconstruct the file. {@link
+ *       RecordWithMetadata#getRecordOffset()}
+ *   <li>recordNum: The ordinal number of the record in its file. {@link
+ *       RecordWithMetadata#getRecordNum()}
+ *   <li>recordValue: The value / contents of the record {@link RecordWithMetadata#getRecordValue()}
+ *   <li>rangeOffset: The starting offset of the range (split), which contained the record, when the
+ *       record was read. {@link RecordWithMetadata#getRangeOffset()}
+ *   <li>recordNumInOffset: The record number relative to the Range. (line number within the range)
+ *       {@link RecordWithMetadata#getRecordNumInOffset()}
+ *   <li>fileName: Name of the file to which the record belongs (this is the full filename,
+ *       eg:path/to/file.txt) {@link RecordWithMetadata#getFileName()}
+ * </ul>
+ */
+@Experimental(Experimental.Kind.SCHEMAS)
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class RecordWithMetadata {
+  public abstract Long getRecordOffset();
+
+  public abstract Long getRecordNum();
+
+  public abstract String getRecordValue();

Review comment:
       nit: getRecordValue -> getValue

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIOSource.java
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation detail of {@link ContextualTextIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} which can decode records delimited by newline characters.
+ *
+ * <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or {@code
+ * \r\n} as the delimiter. This source is not strict and supports decoding the last record even if
+ * it is not delimited. Finally, no records are decoded if the stream is empty.
+ *
+ * <p>This source supports reading from any arbitrary byte position within the stream. If the
+ * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found
+ * representing the beginning of the first record to be decoded.
+ */
+@VisibleForTesting
+class ContextualTextIOSource extends FileBasedSource<RecordWithMetadata> {
+  byte[] delimiter;
+
+  private static final Logger LOG = LoggerFactory.getLogger(ContextualTextIOSource.class);
+
+  // Used to Override isSplittable
+  private boolean hasMultilineCSVRecords;
+
+  @Override
+  protected boolean isSplittable() throws Exception {
+    if (hasMultilineCSVRecords) {
+      // When Having Multiline CSV Records,
+      // Splitting the file may cause a split to be within a record,
+      // Disabling split prevents this from happening
+      return false;
+    }
+    return super.isSplittable();
+  }
+
+  ContextualTextIOSource(
+      ValueProvider<String> fileSpec,
+      EmptyMatchTreatment emptyMatchTreatment,
+      byte[] delimiter,
+      boolean hasMultilineCSVRecords) {
+    super(fileSpec, emptyMatchTreatment, 1L);
+    this.delimiter = delimiter;
+    this.hasMultilineCSVRecords = hasMultilineCSVRecords;
+  }
+
+  private ContextualTextIOSource(
+      MatchResult.Metadata metadata,
+      long start,
+      long end,
+      byte[] delimiter,
+      boolean hasMultilineCSVRecords) {
+    super(metadata, 1L, start, end);
+    this.delimiter = delimiter;
+    this.hasMultilineCSVRecords = hasMultilineCSVRecords;
+  }
+
+  @Override
+  protected FileBasedSource<RecordWithMetadata> createForSubrangeOfFile(
+      MatchResult.Metadata metadata, long start, long end) {
+    return new ContextualTextIOSource(metadata, start, end, delimiter, hasMultilineCSVRecords);
+  }
+
+  @Override
+  protected FileBasedReader<RecordWithMetadata> createSingleFileReader(PipelineOptions options) {
+    return new MultiLineTextBasedReader(this, delimiter, hasMultilineCSVRecords);
+  }
+
+  @Override
+  public Coder<RecordWithMetadata> getOutputCoder() {
+    SchemaCoder<RecordWithMetadata> coder = null;
+    try {
+      coder = SchemaRegistry.createDefault().getSchemaCoder(RecordWithMetadata.class);
+    } catch (NoSuchSchemaException e) {
+      LOG.error("No Coder Found for RecordWithMetadata");
+    }
+    return coder;
+  }
+
+  /**
+   * A {@link FileBasedReader FileBasedReader} which can decode records delimited by delimiter
+   * characters.
+   *
+   * <p>See {@link ContextualTextIOSource } for further details.

Review comment:
       ```suggestion
      * <p>See {@link ContextualTextIOSource} for further details.
   ```

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -319,31 +326,169 @@ static boolean isSelfOverlapping(byte[] s) {
     }
 
     @Override
-    public PCollection<String> expand(PBegin input) {
-      checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
+    public PCollection<RecordWithMetadata> expand(PBegin input) {
+      checkNotNull(
+          getFilepattern(), "need to set the filepattern of a ContextualTextIO.Read transform");
+      PCollection<RecordWithMetadata> lines = null;
       if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
-        return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+        lines = input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+      } else {
+        // All other cases go through FileIO + ReadFiles
+        lines =
+            input
+                .apply(
+                    "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+                .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+                .apply(
+                    "Read Matches",
+                    FileIO.readMatches()
+                        .withCompression(getCompression())
+                        .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+                .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
       }
 
-      // All other cases go through FileIO + ReadFiles
-      return input
-          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
-          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
-          .apply(
-              "Read Matches",
-              FileIO.readMatches()
-                  .withCompression(getCompression())
-                  .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
-          .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
+      // Check if the user decided to opt out of recordNums associated with records
+      if (getWithoutLineNumMetadata()) {
+        return lines;
+      }
+
+      // At this point the line number in RecordWithMetadata contains the relative line offset from

Review comment:
       I think using:
   ```
   /*
    *
    *
    */
   ```
   will allow formatting tools to reformat multi-line comments automatically.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org