You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/20 05:45:34 UTC

[GitHub] [beam] tvalentyn commented on a change in pull request #12645: [BEAM-10124] Add ContextualTextIO

tvalentyn commented on a change in pull request #12645:
URL: https://github.com/apache/beam/pull/12645#discussion_r473585601



##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -319,31 +326,169 @@ static boolean isSelfOverlapping(byte[] s) {
     }
 
     @Override
-    public PCollection<String> expand(PBegin input) {
-      checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
+    public PCollection<RecordWithMetadata> expand(PBegin input) {
+      checkNotNull(
+          getFilepattern(), "need to set the filepattern of a ContextualTextIO.Read transform");
+      PCollection<RecordWithMetadata> lines = null;
       if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
-        return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+        lines = input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+      } else {
+        // All other cases go through FileIO + ReadFiles
+        lines =
+            input
+                .apply(
+                    "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+                .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+                .apply(
+                    "Read Matches",
+                    FileIO.readMatches()
+                        .withCompression(getCompression())
+                        .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+                .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
       }
 
-      // All other cases go through FileIO + ReadFiles
-      return input
-          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
-          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
-          .apply(
-              "Read Matches",
-              FileIO.readMatches()
-                  .withCompression(getCompression())
-                  .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
-          .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
+      // Check if the user decided to opt out of recordNums associated with records
+      if (getWithoutLineNumMetadata()) {
+        return lines;
+      }
+
+      // At this point the line number in RecordWithMetadata contains the relative line offset from

Review comment:
       nit: fix the indentation

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -319,31 +326,169 @@ static boolean isSelfOverlapping(byte[] s) {
     }
 
     @Override
-    public PCollection<String> expand(PBegin input) {
-      checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
+    public PCollection<RecordWithMetadata> expand(PBegin input) {
+      checkNotNull(
+          getFilepattern(), "need to set the filepattern of a ContextualTextIO.Read transform");
+      PCollection<RecordWithMetadata> lines = null;
       if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
-        return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+        lines = input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+      } else {
+        // All other cases go through FileIO + ReadFiles
+        lines =

Review comment:
       records?

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/Range.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import com.google.auto.value.AutoValue;
+
+@AutoValue
+public abstract class Range {
+  public abstract Long getRangeNum();

Review comment:
       (Please add docstrings and consider a different name).
   

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -319,31 +326,169 @@ static boolean isSelfOverlapping(byte[] s) {
     }
 
     @Override
-    public PCollection<String> expand(PBegin input) {
-      checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
+    public PCollection<RecordWithMetadata> expand(PBegin input) {
+      checkNotNull(
+          getFilepattern(), "need to set the filepattern of a ContextualTextIO.Read transform");
+      PCollection<RecordWithMetadata> lines = null;
       if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
-        return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+        lines = input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+      } else {
+        // All other cases go through FileIO + ReadFiles
+        lines =
+            input
+                .apply(
+                    "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+                .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+                .apply(
+                    "Read Matches",
+                    FileIO.readMatches()
+                        .withCompression(getCompression())
+                        .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+                .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
       }
 
-      // All other cases go through FileIO + ReadFiles
-      return input
-          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
-          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
-          .apply(
-              "Read Matches",
-              FileIO.readMatches()
-                  .withCompression(getCompression())
-                  .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
-          .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
+      // Check if the user decided to opt out of recordNums associated with records
+      if (getWithoutLineNumMetadata()) {
+        return lines;
+      }
+
+      // At this point the line number in RecordWithMetadata contains the relative line offset from
+      // the
+      // beginning of the read range.
+
+      // To compute the absolute position from the beginning of the input,
+      // we group the lines within the same ranges, and evaluate the size of each range.
+
+      PCollection<KV<KV<String, Long>, RecordWithMetadata>> linesGroupedByFileAndRange =
+          lines.apply("AddFileNameAndRange", ParDo.of(new AddFileNameAndRange()));
+
+      PCollectionView<Map<KV<String, Long>, Long>> rangeSizes =
+          linesGroupedByFileAndRange
+              .apply("CountLinesForEachFileRange", Count.perKey())
+              .apply("SizesAsView", View.asMap());
+
+      // Get Pipeline to create a dummy PCollection with one element to help compute the lines
+      // before each Range
+      PCollection<Integer> singletonPcoll =
+          input.getPipeline().apply("CreateSingletonPcoll", Create.of(Arrays.asList(1)));
+
+      // For each (File, Offset) pair, calculate the number of lines occurring before the Range for
+      // each File
+
+      // After computing the number of lines before each range, we can find the line number in
+      // original file as numLiesBeforeOffset + lineNumInCurrentOffset
+      PCollectionView<Map<KV<String, Long>, Long>> numLinesBeforeEachRange =
+          singletonPcoll
+              .apply(
+                  "ComputeLinesBeforeRange",

Review comment:
       ComputeRecordsBeforeRange

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -296,6 +290,19 @@ public Read withHintMatchesManyFiles() {
       return toBuilder().setHintMatchesManyFiles(true).build();
     }
 
+    /**
+     * Allows the user the opt out of getting recordNums associated with each record.

Review comment:
       Allows the user to opt out of ...

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -296,6 +290,19 @@ public Read withHintMatchesManyFiles() {
       return toBuilder().setHintMatchesManyFiles(true).build();
     }
 
+    /**
+     * Allows the user the opt out of getting recordNums associated with each record.
+     *
+     * <p>ContextualTextIO uses a shuffle step to assemble the recordNums for each record which may
+     * result in some performance loss.
+     *
+     * <p>Use this when metadata like fileNames are required and their position/order can be
+     * ignored.
+     */
+    public Read withoutLineNumMetadata() {

Review comment:
        s/withoutLineNumMetadata/withoutRecordNumMetadata ?
   

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -319,31 +326,169 @@ static boolean isSelfOverlapping(byte[] s) {
     }
 
     @Override
-    public PCollection<String> expand(PBegin input) {
-      checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
+    public PCollection<RecordWithMetadata> expand(PBegin input) {
+      checkNotNull(
+          getFilepattern(), "need to set the filepattern of a ContextualTextIO.Read transform");
+      PCollection<RecordWithMetadata> lines = null;
       if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
-        return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+        lines = input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+      } else {
+        // All other cases go through FileIO + ReadFiles
+        lines =
+            input
+                .apply(
+                    "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+                .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+                .apply(
+                    "Read Matches",
+                    FileIO.readMatches()
+                        .withCompression(getCompression())
+                        .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+                .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
       }
 
-      // All other cases go through FileIO + ReadFiles
-      return input
-          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
-          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
-          .apply(
-              "Read Matches",
-              FileIO.readMatches()
-                  .withCompression(getCompression())
-                  .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
-          .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
+      // Check if the user decided to opt out of recordNums associated with records
+      if (getWithoutLineNumMetadata()) {
+        return lines;
+      }
+
+      // At this point the line number in RecordWithMetadata contains the relative line offset from
+      // the
+      // beginning of the read range.
+
+      // To compute the absolute position from the beginning of the input,
+      // we group the lines within the same ranges, and evaluate the size of each range.
+
+      PCollection<KV<KV<String, Long>, RecordWithMetadata>> linesGroupedByFileAndRange =
+          lines.apply("AddFileNameAndRange", ParDo.of(new AddFileNameAndRange()));
+
+      PCollectionView<Map<KV<String, Long>, Long>> rangeSizes =
+          linesGroupedByFileAndRange
+              .apply("CountLinesForEachFileRange", Count.perKey())
+              .apply("SizesAsView", View.asMap());
+
+      // Get Pipeline to create a dummy PCollection with one element to help compute the lines
+      // before each Range
+      PCollection<Integer> singletonPcoll =
+          input.getPipeline().apply("CreateSingletonPcoll", Create.of(Arrays.asList(1)));
+
+      // For each (File, Offset) pair, calculate the number of lines occurring before the Range for
+      // each File
+
+      // After computing the number of lines before each range, we can find the line number in

Review comment:
       similarly here line numbers -> records

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -319,31 +326,169 @@ static boolean isSelfOverlapping(byte[] s) {
     }
 
     @Override
-    public PCollection<String> expand(PBegin input) {
-      checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
+    public PCollection<RecordWithMetadata> expand(PBegin input) {
+      checkNotNull(
+          getFilepattern(), "need to set the filepattern of a ContextualTextIO.Read transform");
+      PCollection<RecordWithMetadata> lines = null;
       if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
-        return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+        lines = input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+      } else {
+        // All other cases go through FileIO + ReadFiles
+        lines =
+            input
+                .apply(
+                    "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+                .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+                .apply(
+                    "Read Matches",
+                    FileIO.readMatches()
+                        .withCompression(getCompression())
+                        .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+                .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
       }
 
-      // All other cases go through FileIO + ReadFiles
-      return input
-          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
-          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
-          .apply(
-              "Read Matches",
-              FileIO.readMatches()
-                  .withCompression(getCompression())
-                  .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
-          .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
+      // Check if the user decided to opt out of recordNums associated with records
+      if (getWithoutLineNumMetadata()) {
+        return lines;
+      }
+
+      // At this point the line number in RecordWithMetadata contains the relative line offset from
+      // the
+      // beginning of the read range.
+
+      // To compute the absolute position from the beginning of the input,
+      // we group the lines within the same ranges, and evaluate the size of each range.
+
+      PCollection<KV<KV<String, Long>, RecordWithMetadata>> linesGroupedByFileAndRange =
+          lines.apply("AddFileNameAndRange", ParDo.of(new AddFileNameAndRange()));
+
+      PCollectionView<Map<KV<String, Long>, Long>> rangeSizes =
+          linesGroupedByFileAndRange

Review comment:
       recordsGroupedByFileAndRange

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -319,31 +326,169 @@ static boolean isSelfOverlapping(byte[] s) {
     }
 
     @Override
-    public PCollection<String> expand(PBegin input) {
-      checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
+    public PCollection<RecordWithMetadata> expand(PBegin input) {
+      checkNotNull(
+          getFilepattern(), "need to set the filepattern of a ContextualTextIO.Read transform");
+      PCollection<RecordWithMetadata> lines = null;
       if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
-        return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+        lines = input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+      } else {
+        // All other cases go through FileIO + ReadFiles
+        lines =
+            input
+                .apply(
+                    "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+                .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+                .apply(
+                    "Read Matches",
+                    FileIO.readMatches()
+                        .withCompression(getCompression())
+                        .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+                .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
       }
 
-      // All other cases go through FileIO + ReadFiles
-      return input
-          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
-          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
-          .apply(
-              "Read Matches",
-              FileIO.readMatches()
-                  .withCompression(getCompression())
-                  .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
-          .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
+      // Check if the user decided to opt out of recordNums associated with records
+      if (getWithoutLineNumMetadata()) {
+        return lines;
+      }
+
+      // At this point the line number in RecordWithMetadata contains the relative line offset from
+      // the
+      // beginning of the read range.
+
+      // To compute the absolute position from the beginning of the input,
+      // we group the lines within the same ranges, and evaluate the size of each range.
+
+      PCollection<KV<KV<String, Long>, RecordWithMetadata>> linesGroupedByFileAndRange =
+          lines.apply("AddFileNameAndRange", ParDo.of(new AddFileNameAndRange()));
+
+      PCollectionView<Map<KV<String, Long>, Long>> rangeSizes =
+          linesGroupedByFileAndRange
+              .apply("CountLinesForEachFileRange", Count.perKey())
+              .apply("SizesAsView", View.asMap());
+
+      // Get Pipeline to create a dummy PCollection with one element to help compute the lines
+      // before each Range
+      PCollection<Integer> singletonPcoll =
+          input.getPipeline().apply("CreateSingletonPcoll", Create.of(Arrays.asList(1)));
+
+      // For each (File, Offset) pair, calculate the number of lines occurring before the Range for

Review comment:
       ... calculate the number of records occurring before the beginning of the Range for each file?

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIOSource.java
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Implementation detail of {@link ContextualTextIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} which can decode records delimited by newline characters.
+ *
+ * <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or {@code
+ * \r\n} as the delimiter. This source is not strict and supports decoding the last record even if
+ * it is not delimited. Finally, no records are decoded if the stream is empty.
+ *
+ * <p>This source supports reading from any arbitrary byte position within the stream. If the
+ * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found
+ * representing the beginning of the first record to be decoded.
+ */
+@VisibleForTesting
+class ContextualTextIOSource extends FileBasedSource<RecordWithMetadata> {
+  byte[] delimiter;
+
+  // Used to Override isSplittable
+  private boolean hasMultilineCSVRecords;
+
+  @Override
+  protected boolean isSplittable() throws Exception {
+    if (hasMultilineCSVRecords) {
+      return false;
+    }
+    return super.isSplittable();
+  }
+
+  ContextualTextIOSource(
+      ValueProvider<String> fileSpec,
+      EmptyMatchTreatment emptyMatchTreatment,
+      byte[] delimiter,
+      boolean hasMultilineCSVRecords) {
+    super(fileSpec, emptyMatchTreatment, 1L);
+    this.delimiter = delimiter;
+    this.hasMultilineCSVRecords = hasMultilineCSVRecords;
+  }
+
+  private ContextualTextIOSource(
+      MatchResult.Metadata metadata,
+      long start,
+      long end,
+      byte[] delimiter,
+      boolean hasMultilineCSVRecords) {
+    super(metadata, 1L, start, end);
+    this.delimiter = delimiter;
+    this.hasMultilineCSVRecords = hasMultilineCSVRecords;
+  }
+
+  @Override
+  protected FileBasedSource<RecordWithMetadata> createForSubrangeOfFile(
+      MatchResult.Metadata metadata, long start, long end) {
+    return new ContextualTextIOSource(metadata, start, end, delimiter, hasMultilineCSVRecords);
+  }
+
+  @Override
+  protected FileBasedReader<RecordWithMetadata> createSingleFileReader(PipelineOptions options) {
+    return new MultiLineTextBasedReader(this, delimiter, hasMultilineCSVRecords);
+  }
+
+  @Override
+  public Coder<RecordWithMetadata> getOutputCoder() {
+    SchemaCoder<RecordWithMetadata> coder = null;
+    try {
+      coder = SchemaRegistry.createDefault().getSchemaCoder(RecordWithMetadata.class);
+    } catch (NoSuchSchemaException e) {
+      System.out.println("No Coder!");

Review comment:
       Logger here as well, see: https://github.com/apache/beam/blob/71c7760f4b5c5bf0d91e2c8403fae99216308a3e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java#L897

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -319,31 +326,169 @@ static boolean isSelfOverlapping(byte[] s) {
     }
 
     @Override
-    public PCollection<String> expand(PBegin input) {
-      checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
+    public PCollection<RecordWithMetadata> expand(PBegin input) {
+      checkNotNull(
+          getFilepattern(), "need to set the filepattern of a ContextualTextIO.Read transform");
+      PCollection<RecordWithMetadata> lines = null;
       if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
-        return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+        lines = input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+      } else {
+        // All other cases go through FileIO + ReadFiles
+        lines =
+            input
+                .apply(
+                    "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+                .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+                .apply(
+                    "Read Matches",
+                    FileIO.readMatches()
+                        .withCompression(getCompression())
+                        .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+                .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
       }
 
-      // All other cases go through FileIO + ReadFiles
-      return input
-          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
-          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
-          .apply(
-              "Read Matches",
-              FileIO.readMatches()
-                  .withCompression(getCompression())
-                  .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
-          .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
+      // Check if the user decided to opt out of recordNums associated with records
+      if (getWithoutLineNumMetadata()) {
+        return lines;
+      }
+
+      // At this point the line number in RecordWithMetadata contains the relative line offset from
+      // the
+      // beginning of the read range.
+
+      // To compute the absolute position from the beginning of the input,
+      // we group the lines within the same ranges, and evaluate the size of each range.
+
+      PCollection<KV<KV<String, Long>, RecordWithMetadata>> linesGroupedByFileAndRange =
+          lines.apply("AddFileNameAndRange", ParDo.of(new AddFileNameAndRange()));
+
+      PCollectionView<Map<KV<String, Long>, Long>> rangeSizes =
+          linesGroupedByFileAndRange
+              .apply("CountLinesForEachFileRange", Count.perKey())
+              .apply("SizesAsView", View.asMap());
+
+      // Get Pipeline to create a dummy PCollection with one element to help compute the lines
+      // before each Range
+      PCollection<Integer> singletonPcoll =
+          input.getPipeline().apply("CreateSingletonPcoll", Create.of(Arrays.asList(1)));
+
+      // For each (File, Offset) pair, calculate the number of lines occurring before the Range for
+      // each File
+
+      // After computing the number of lines before each range, we can find the line number in
+      // original file as numLiesBeforeOffset + lineNumInCurrentOffset
+      PCollectionView<Map<KV<String, Long>, Long>> numLinesBeforeEachRange =
+          singletonPcoll
+              .apply(
+                  "ComputeLinesBeforeRange",
+                  ParDo.of(new ComputeLinesBeforeEachRange(rangeSizes)).withSideInputs(rangeSizes))
+              .apply("NumLinesBeforeEachRangeAsView", View.asMap());
+
+      return linesGroupedByFileAndRange.apply(
+          "AssignLineNums",
+          ParDo.of(new AssignLineNums(numLinesBeforeEachRange))
+              .withSideInputs(numLinesBeforeEachRange));
+    }
+
+    @VisibleForTesting
+    static class AddFileNameAndRange
+        extends DoFn<RecordWithMetadata, KV<KV<String, Long>, RecordWithMetadata>> {
+      @ProcessElement
+      public void processElement(
+          @Element RecordWithMetadata line,
+          OutputReceiver<KV<KV<String, Long>, RecordWithMetadata>> out) {
+        out.output(KV.of(KV.of(line.getFileName(), line.getRange().getRangeNum()), line));
+      }
+    }
+
+    /**
+     * Helper class for computing number of lines in the File preceding the beginning of the Range
+     * in this file.
+     */
+    @VisibleForTesting
+    static class ComputeLinesBeforeEachRange extends DoFn<Integer, KV<KV<String, Long>, Long>> {
+      private final PCollectionView<Map<KV<String, Long>, Long>> rangeSizes;
+
+      public ComputeLinesBeforeEachRange(PCollectionView<Map<KV<String, Long>, Long>> rangeSizes) {
+        this.rangeSizes = rangeSizes;
+      }
+
+      // Add custom comparator as KV<K, V> is not comparable by default
+      private static class FileRangeComparator<K extends Comparable<K>, V extends Comparable<V>>
+          implements Comparator<KV<K, V>> {
+        @Override
+        public int compare(KV<K, V> a, KV<K, V> b) {
+          if (a.getKey().compareTo(b.getKey()) == 0) {
+            return a.getValue().compareTo(b.getValue());
+          }
+          return a.getKey().compareTo(b.getKey());
+        }
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext p) {
+        // Get the Map Containing the size from side-input
+        Map<KV<String, Long>, Long> rangeSizesMap = p.sideInput(rangeSizes);
+
+        // The FileRange Pair must be sorted
+        SortedMap<KV<String, Long>, Long> sorted = new TreeMap<>(new FileRangeComparator<>());
+
+        // Initialize sorted map with values
+        for (Map.Entry<KV<String, Long>, Long> entry : rangeSizesMap.entrySet()) {
+          sorted.put(entry.getKey(), entry.getValue());
+        }
+
+        // HashMap that tracks lines passed for each file
+        Map<String, Long> pastLines = new HashMap<>();
+
+        // For each (File, Range) Pair, compute the number of lines before it
+        for (Map.Entry entry : sorted.entrySet()) {
+          Long lines = (long) entry.getValue();

Review comment:
       lines/records?

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIOSource.java
##########
@@ -259,7 +317,31 @@ private void decodeCurrentElement() throws IOException {
       if (startOfRecord == 0 && dataToDecode.startsWith(UTF8_BOM)) {
         dataToDecode = dataToDecode.substring(UTF8_BOM.size());
       }
-      currentValue = dataToDecode.toStringUtf8();
+
+      /////////////////////////////////////////////
+
+      //      Data of the Current Line
+      //      dataToDecode.toStringUtf8();
+
+      // The line num is:

Review comment:
       redundant comment

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIOSource.java
##########
@@ -259,7 +317,31 @@ private void decodeCurrentElement() throws IOException {
       if (startOfRecord == 0 && dataToDecode.startsWith(UTF8_BOM)) {
         dataToDecode = dataToDecode.substring(UTF8_BOM.size());
       }
-      currentValue = dataToDecode.toStringUtf8();
+
+      /////////////////////////////////////////////
+
+      //      Data of the Current Line

Review comment:
       Do we need this commented-out code?

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIOSource.java
##########
@@ -99,14 +126,23 @@ private ContextualTextIOSource(MatchResult.Metadata metadata, long start, long e
     private volatile long startOfNextRecord;
     private volatile boolean eof;
     private volatile boolean elementIsPresent;
-    private @Nullable String currentValue;
+    private @Nullable RecordWithMetadata currentValue;
     private @Nullable ReadableByteChannel inChannel;
     private byte @Nullable [] delimiter;
 
-    private TextBasedReader(ContextualTextIOSource source, byte[] delimiter) {
+    // Add to override the isSplittable
+    private boolean hasRFC4180MultiLineColumn;
+
+    private long startingOffset;
+    private long readerlineNum;
+
+    private MultiLineTextBasedReader(
+        ContextualTextIOSource source, byte[] delimiter, boolean hasRFC4180MultiLineColumn) {

Review comment:
       hasMultilineCSVRecords

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIOSource.java
##########
@@ -152,18 +188,27 @@ protected void startReading(ReadableByteChannel channel) throws IOException {
           requiredPosition = startOffset - delimiter.length;
         }
         ((SeekableByteChannel) channel).position(requiredPosition);
-        findDelimiterBounds();
+        findDelimiterBoundsWithMultiLineCheck();
         buffer = buffer.substring(endOfDelimiterInBuffer);
         startOfNextRecord = requiredPosition + endOfDelimiterInBuffer;
         endOfDelimiterInBuffer = 0;
         startOfDelimiterInBuffer = 0;
       }
     }
 
+    private void findDelimiterBoundsWithMultiLineCheck() throws IOException {

Review comment:
       Do we need this helper?

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/Range.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import com.google.auto.value.AutoValue;
+
+@AutoValue
+public abstract class Range {
+  public abstract Long getRangeNum();
+
+  public abstract Long getRangeLineNum();

Review comment:
       What is this? Number of records in the range?
   (Please add docstrings and consider a different name).

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/RecordWithMetadata.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+@Internal
+@Experimental(Experimental.Kind.SCHEMAS)
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class RecordWithMetadata {
+  public abstract Range getRange();
+
+  public abstract Long getRecordNum();

Review comment:
       @rezarokni @abhiy13 would it make sense to include the offset of the record in the input (if it's easy to compute, also can be computed even when user disables withoutRecordNumMetadata (without shuffles/sideinput) )?
   
   Can also be done later..

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -319,31 +326,169 @@ static boolean isSelfOverlapping(byte[] s) {
     }
 
     @Override
-    public PCollection<String> expand(PBegin input) {
-      checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
+    public PCollection<RecordWithMetadata> expand(PBegin input) {
+      checkNotNull(
+          getFilepattern(), "need to set the filepattern of a ContextualTextIO.Read transform");
+      PCollection<RecordWithMetadata> lines = null;
       if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
-        return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+        lines = input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+      } else {
+        // All other cases go through FileIO + ReadFiles
+        lines =
+            input
+                .apply(
+                    "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+                .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+                .apply(
+                    "Read Matches",
+                    FileIO.readMatches()
+                        .withCompression(getCompression())
+                        .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+                .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
       }
 
-      // All other cases go through FileIO + ReadFiles
-      return input
-          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
-          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
-          .apply(
-              "Read Matches",
-              FileIO.readMatches()
-                  .withCompression(getCompression())
-                  .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
-          .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
+      // Check if the user decided to opt out of recordNums associated with records
+      if (getWithoutLineNumMetadata()) {
+        return lines;
+      }
+
+      // At this point the line number in RecordWithMetadata contains the relative line offset from
+      // the
+      // beginning of the read range.
+
+      // To compute the absolute position from the beginning of the input,
+      // we group the lines within the same ranges, and evaluate the size of each range.
+
+      PCollection<KV<KV<String, Long>, RecordWithMetadata>> linesGroupedByFileAndRange =
+          lines.apply("AddFileNameAndRange", ParDo.of(new AddFileNameAndRange()));
+
+      PCollectionView<Map<KV<String, Long>, Long>> rangeSizes =
+          linesGroupedByFileAndRange
+              .apply("CountLinesForEachFileRange", Count.perKey())
+              .apply("SizesAsView", View.asMap());
+
+      // Get Pipeline to create a dummy PCollection with one element to help compute the lines
+      // before each Range
+      PCollection<Integer> singletonPcoll =
+          input.getPipeline().apply("CreateSingletonPcoll", Create.of(Arrays.asList(1)));
+
+      // For each (File, Offset) pair, calculate the number of lines occurring before the Range for
+      // each File
+
+      // After computing the number of lines before each range, we can find the line number in
+      // original file as numLiesBeforeOffset + lineNumInCurrentOffset
+      PCollectionView<Map<KV<String, Long>, Long>> numLinesBeforeEachRange =
+          singletonPcoll
+              .apply(
+                  "ComputeLinesBeforeRange",
+                  ParDo.of(new ComputeLinesBeforeEachRange(rangeSizes)).withSideInputs(rangeSizes))
+              .apply("NumLinesBeforeEachRangeAsView", View.asMap());
+
+      return linesGroupedByFileAndRange.apply(
+          "AssignLineNums",
+          ParDo.of(new AssignLineNums(numLinesBeforeEachRange))
+              .withSideInputs(numLinesBeforeEachRange));
+    }
+
+    @VisibleForTesting
+    static class AddFileNameAndRange
+        extends DoFn<RecordWithMetadata, KV<KV<String, Long>, RecordWithMetadata>> {
+      @ProcessElement
+      public void processElement(
+          @Element RecordWithMetadata line,
+          OutputReceiver<KV<KV<String, Long>, RecordWithMetadata>> out) {
+        out.output(KV.of(KV.of(line.getFileName(), line.getRange().getRangeNum()), line));
+      }
+    }
+
+    /**
+     * Helper class for computing number of lines in the File preceding the beginning of the Range
+     * in this file.
+     */
+    @VisibleForTesting
+    static class ComputeLinesBeforeEachRange extends DoFn<Integer, KV<KV<String, Long>, Long>> {

Review comment:
       ComputeRecordsBeforeEachRange or ComputeNumRecordsBeforeEachRange ?

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIO.java
##########
@@ -319,31 +326,169 @@ static boolean isSelfOverlapping(byte[] s) {
     }
 
     @Override
-    public PCollection<String> expand(PBegin input) {
-      checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
+    public PCollection<RecordWithMetadata> expand(PBegin input) {
+      checkNotNull(
+          getFilepattern(), "need to set the filepattern of a ContextualTextIO.Read transform");
+      PCollection<RecordWithMetadata> lines = null;
       if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
-        return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+        lines = input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+      } else {
+        // All other cases go through FileIO + ReadFiles
+        lines =
+            input
+                .apply(
+                    "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+                .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+                .apply(
+                    "Read Matches",
+                    FileIO.readMatches()
+                        .withCompression(getCompression())
+                        .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
+                .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
       }
 
-      // All other cases go through FileIO + ReadFiles
-      return input
-          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
-          .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
-          .apply(
-              "Read Matches",
-              FileIO.readMatches()
-                  .withCompression(getCompression())
-                  .withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
-          .apply("Via ReadFiles", readFiles().withDelimiter(getDelimiter()));
+      // Check if the user decided to opt out of recordNums associated with records
+      if (getWithoutLineNumMetadata()) {
+        return lines;
+      }
+
+      // At this point the line number in RecordWithMetadata contains the relative line offset from
+      // the
+      // beginning of the read range.
+
+      // To compute the absolute position from the beginning of the input,
+      // we group the lines within the same ranges, and evaluate the size of each range.
+
+      PCollection<KV<KV<String, Long>, RecordWithMetadata>> linesGroupedByFileAndRange =
+          lines.apply("AddFileNameAndRange", ParDo.of(new AddFileNameAndRange()));
+
+      PCollectionView<Map<KV<String, Long>, Long>> rangeSizes =
+          linesGroupedByFileAndRange
+              .apply("CountLinesForEachFileRange", Count.perKey())
+              .apply("SizesAsView", View.asMap());
+
+      // Get Pipeline to create a dummy PCollection with one element to help compute the lines
+      // before each Range
+      PCollection<Integer> singletonPcoll =
+          input.getPipeline().apply("CreateSingletonPcoll", Create.of(Arrays.asList(1)));
+
+      // For each (File, Offset) pair, calculate the number of lines occurring before the Range for
+      // each File
+
+      // After computing the number of lines before each range, we can find the line number in
+      // original file as numLiesBeforeOffset + lineNumInCurrentOffset
+      PCollectionView<Map<KV<String, Long>, Long>> numLinesBeforeEachRange =
+          singletonPcoll
+              .apply(
+                  "ComputeLinesBeforeRange",
+                  ParDo.of(new ComputeLinesBeforeEachRange(rangeSizes)).withSideInputs(rangeSizes))
+              .apply("NumLinesBeforeEachRangeAsView", View.asMap());
+
+      return linesGroupedByFileAndRange.apply(
+          "AssignLineNums",
+          ParDo.of(new AssignLineNums(numLinesBeforeEachRange))
+              .withSideInputs(numLinesBeforeEachRange));
+    }
+
+    @VisibleForTesting
+    static class AddFileNameAndRange
+        extends DoFn<RecordWithMetadata, KV<KV<String, Long>, RecordWithMetadata>> {
+      @ProcessElement
+      public void processElement(
+          @Element RecordWithMetadata line,
+          OutputReceiver<KV<KV<String, Long>, RecordWithMetadata>> out) {
+        out.output(KV.of(KV.of(line.getFileName(), line.getRange().getRangeNum()), line));
+      }
+    }
+
+    /**
+     * Helper class for computing number of lines in the File preceding the beginning of the Range
+     * in this file.
+     */
+    @VisibleForTesting
+    static class ComputeLinesBeforeEachRange extends DoFn<Integer, KV<KV<String, Long>, Long>> {
+      private final PCollectionView<Map<KV<String, Long>, Long>> rangeSizes;
+
+      public ComputeLinesBeforeEachRange(PCollectionView<Map<KV<String, Long>, Long>> rangeSizes) {
+        this.rangeSizes = rangeSizes;
+      }
+
+      // Add custom comparator as KV<K, V> is not comparable by default
+      private static class FileRangeComparator<K extends Comparable<K>, V extends Comparable<V>>
+          implements Comparator<KV<K, V>> {
+        @Override
+        public int compare(KV<K, V> a, KV<K, V> b) {
+          if (a.getKey().compareTo(b.getKey()) == 0) {
+            return a.getValue().compareTo(b.getValue());
+          }
+          return a.getKey().compareTo(b.getKey());
+        }
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext p) {
+        // Get the Map Containing the size from side-input
+        Map<KV<String, Long>, Long> rangeSizesMap = p.sideInput(rangeSizes);
+
+        // The FileRange Pair must be sorted
+        SortedMap<KV<String, Long>, Long> sorted = new TreeMap<>(new FileRangeComparator<>());
+
+        // Initialize sorted map with values
+        for (Map.Entry<KV<String, Long>, Long> entry : rangeSizesMap.entrySet()) {
+          sorted.put(entry.getKey(), entry.getValue());
+        }
+
+        // HashMap that tracks lines passed for each file
+        Map<String, Long> pastLines = new HashMap<>();
+
+        // For each (File, Range) Pair, compute the number of lines before it
+        for (Map.Entry entry : sorted.entrySet()) {
+          Long lines = (long) entry.getValue();

Review comment:
       (see also other `line`, `lines` mentions in this PR.)

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIOSource.java
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.FileBasedSource;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Implementation detail of {@link ContextualTextIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} which can decode records delimited by newline characters.
+ *
+ * <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or {@code
+ * \r\n} as the delimiter. This source is not strict and supports decoding the last record even if
+ * it is not delimited. Finally, no records are decoded if the stream is empty.
+ *
+ * <p>This source supports reading from any arbitrary byte position within the stream. If the
+ * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found
+ * representing the beginning of the first record to be decoded.
+ */
+@VisibleForTesting
+class ContextualTextIOSource extends FileBasedSource<RecordWithMetadata> {
+  byte[] delimiter;
+
+  // Used to Override isSplittable
+  private boolean hasMultilineCSVRecords;
+
+  @Override
+  protected boolean isSplittable() throws Exception {
+    if (hasMultilineCSVRecords) {
+      return false;
+    }
+    return super.isSplittable();
+  }
+
+  ContextualTextIOSource(
+      ValueProvider<String> fileSpec,
+      EmptyMatchTreatment emptyMatchTreatment,
+      byte[] delimiter,
+      boolean hasMultilineCSVRecords) {
+    super(fileSpec, emptyMatchTreatment, 1L);
+    this.delimiter = delimiter;
+    this.hasMultilineCSVRecords = hasMultilineCSVRecords;
+  }
+
+  private ContextualTextIOSource(
+      MatchResult.Metadata metadata,
+      long start,
+      long end,
+      byte[] delimiter,
+      boolean hasMultilineCSVRecords) {
+    super(metadata, 1L, start, end);
+    this.delimiter = delimiter;
+    this.hasMultilineCSVRecords = hasMultilineCSVRecords;
+  }
+
+  @Override
+  protected FileBasedSource<RecordWithMetadata> createForSubrangeOfFile(
+      MatchResult.Metadata metadata, long start, long end) {
+    return new ContextualTextIOSource(metadata, start, end, delimiter, hasMultilineCSVRecords);
+  }
+
+  @Override
+  protected FileBasedReader<RecordWithMetadata> createSingleFileReader(PipelineOptions options) {
+    return new MultiLineTextBasedReader(this, delimiter, hasMultilineCSVRecords);
+  }
+
+  @Override
+  public Coder<RecordWithMetadata> getOutputCoder() {
+    SchemaCoder<RecordWithMetadata> coder = null;
+    try {
+      coder = SchemaRegistry.createDefault().getSchemaCoder(RecordWithMetadata.class);
+    } catch (NoSuchSchemaException e) {
+      System.out.println("No Coder!");
+    }
+    return coder;
+  }
+
+  /**
+   * A {@link FileBasedReader FileBasedReader} which can decode records delimited by delimiter
+   * characters.
+   *
+   * <p>See {@link ContextualTextIOSource } for further details.
+   */
+  @VisibleForTesting
+  static class MultiLineTextBasedReader extends FileBasedReader<RecordWithMetadata> {
+    public static final int READ_BUFFER_SIZE = 8192;
+    private static final ByteString UTF8_BOM =
+        ByteString.copyFrom(new byte[] {(byte) 0xEF, (byte) 0xBB, (byte) 0xBF});
+    private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
+    private ByteString buffer;
+    private int startOfDelimiterInBuffer;
+    private int endOfDelimiterInBuffer;
+    private long startOfRecord;
+    private volatile long startOfNextRecord;
+    private volatile boolean eof;
+    private volatile boolean elementIsPresent;
+    private @Nullable RecordWithMetadata currentValue;
+    private @Nullable ReadableByteChannel inChannel;
+    private byte @Nullable [] delimiter;
+
+    // Add to override the isSplittable
+    private boolean hasRFC4180MultiLineColumn;
+
+    private long startingOffset;
+    private long readerlineNum;
+
+    private MultiLineTextBasedReader(
+        ContextualTextIOSource source, byte[] delimiter, boolean hasRFC4180MultiLineColumn) {
+      super(source);
+      buffer = ByteString.EMPTY;
+      this.delimiter = delimiter;
+      this.hasRFC4180MultiLineColumn = hasRFC4180MultiLineColumn;
+      startingOffset = getCurrentSource().getStartOffset(); // Start offset;
+    }
+
+    @Override
+    protected long getCurrentOffset() throws NoSuchElementException {
+      if (!elementIsPresent) {
+        throw new NoSuchElementException();
+      }
+      return startOfRecord;
+    }
+
+    @Override
+    public long getSplitPointsRemaining() {
+      if (isStarted() && startOfNextRecord >= getCurrentSource().getEndOffset()) {
+        return isDone() ? 0 : 1;
+      }
+      return super.getSplitPointsRemaining();
+    }
+
+    @Override
+    public RecordWithMetadata getCurrent() throws NoSuchElementException {
+      if (!elementIsPresent) {
+        throw new NoSuchElementException();
+      }
+      return currentValue;
+    }
+
+    @Override
+    protected void startReading(ReadableByteChannel channel) throws IOException {
+      this.inChannel = channel;
+      // If the first offset is greater than zero, we need to skip bytes until we see our
+      // first delimiter.
+      long startOffset = getCurrentSource().getStartOffset();
+      if (startOffset > 0) {
+        Preconditions.checkState(
+            channel instanceof SeekableByteChannel,
+            "%s only supports reading from a SeekableByteChannel when given a start offset"
+                + " greater than 0.",
+            ContextualTextIOSource.class.getSimpleName());
+        long requiredPosition = startOffset - 1;
+        if (delimiter != null && startOffset >= delimiter.length) {
+          // we need to move back the offset of at worse delimiter.size to be sure to see
+          // all the bytes of the delimiter in the call to findDelimiterBounds() below
+          requiredPosition = startOffset - delimiter.length;
+        }
+        ((SeekableByteChannel) channel).position(requiredPosition);
+        findDelimiterBoundsWithMultiLineCheck();
+        buffer = buffer.substring(endOfDelimiterInBuffer);
+        startOfNextRecord = requiredPosition + endOfDelimiterInBuffer;
+        endOfDelimiterInBuffer = 0;
+        startOfDelimiterInBuffer = 0;
+      }
+    }
+
+    private void findDelimiterBoundsWithMultiLineCheck() throws IOException {
+      findDelimiterBounds();
+    }
+
+    /**
+     * Locates the start position and end position of the next delimiter. Will consume the channel
+     * till either EOF or the delimiter bounds are found.
+     *
+     * <p>If {@link ContextualTextIOSource#hasMultilineCSVRecords} is set then the behaviour will
+     * change from the standard read seen in {@link org.apache.beam.sdk.io.TextIO}. The assumption
+     * when {@link ContextualTextIOSource#hasMultilineCSVRecords} is set is that the file is being
+     * read with a single thread.
+     *
+     * <p>This fills the buffer and updates the positions as follows:
+     *
+     * <pre>{@code
+     * ------------------------------------------------------
+     * | element bytes | delimiter bytes | unconsumed bytes |
+     * ------------------------------------------------------
+     * 0            start of          end of              buffer
+     *              delimiter         delimiter           size
+     *              in buffer         in buffer
+     * }</pre>
+     */
+    private void findDelimiterBounds() throws IOException {
+      int bytePositionInBuffer = 0;
+      boolean doubleQuoteClosed = true;
+
+      while (true) {
+        if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) {
+          startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer;
+          break;
+        }
+
+        byte currentByte = buffer.byteAt(bytePositionInBuffer);
+        if (hasRFC4180MultiLineColumn) {
+          // Check if we are inside an open Quote
+          if (currentByte == '"') {
+            doubleQuoteClosed = !doubleQuoteClosed;
+          }
+        } else {
+          doubleQuoteClosed = true;
+        }
+
+        if (delimiter == null) {
+          // default delimiter
+          if (currentByte == '\n') {
+            startOfDelimiterInBuffer = bytePositionInBuffer;
+            endOfDelimiterInBuffer = startOfDelimiterInBuffer + 1;
+            if (doubleQuoteClosed) {
+              break;
+            }
+          } else if (currentByte == '\r') {
+            startOfDelimiterInBuffer = bytePositionInBuffer;
+            endOfDelimiterInBuffer = startOfDelimiterInBuffer + 1;
+            if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
+              currentByte = buffer.byteAt(bytePositionInBuffer + 1);
+              if (currentByte == '\n') {
+                endOfDelimiterInBuffer += 1;
+              }
+            }
+            if (doubleQuoteClosed) {
+              break;
+            }
+          }
+        } else {
+          // when the user defines a delimiter
+          int i = 0;
+          startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer;
+          while ((i < delimiter.length) && (currentByte == delimiter[i])) {
+            // read next byte;
+            i++;
+            if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + i + 1)) {
+              currentByte = buffer.byteAt(bytePositionInBuffer + i);
+            } else {
+              // corner case: delimiter truncate at the end of file
+              startOfDelimiterInBuffer = endOfDelimiterInBuffer = bytePositionInBuffer;
+              break;
+            }
+          }
+          if (i == delimiter.length) {
+            endOfDelimiterInBuffer = bytePositionInBuffer + i;
+            if (doubleQuoteClosed) {
+              break;
+            }
+          }
+        }
+        bytePositionInBuffer += 1;
+      }
+    }
+
+    @Override
+    protected boolean readNextRecord() throws IOException {
+      startOfRecord = startOfNextRecord;
+
+      findDelimiterBoundsWithMultiLineCheck();
+
+      // If we have reached EOF file and consumed all of the buffer then we know
+      // that there are no more records.
+      if (eof && buffer.isEmpty()) {
+        elementIsPresent = false;
+        return false;
+      }
+
+      decodeCurrentElement();
+      startOfNextRecord = startOfRecord + endOfDelimiterInBuffer;
+      return true;
+    }
+
+    /**
+     * Decodes the current element updating the buffer to only contain the unconsumed bytes.
+     *
+     * <p>This invalidates the currently stored {@code startOfDelimiterInBuffer} and {@code
+     * endOfDelimiterInBuffer}.
+     */
+    private void decodeCurrentElement() throws IOException {
+      ByteString dataToDecode = buffer.substring(0, startOfDelimiterInBuffer);
+      // If present, the UTF8 Byte Order Mark (BOM) will be removed.
+      if (startOfRecord == 0 && dataToDecode.startsWith(UTF8_BOM)) {
+        dataToDecode = dataToDecode.substring(UTF8_BOM.size());
+      }
+
+      /////////////////////////////////////////////
+
+      //      Data of the Current Line
+      //      dataToDecode.toStringUtf8();
+
+      // The line num is:
+      Long lineUniqueLineNum = readerlineNum++;
+      // The Complete FileName (with uri if this is a web url eg: temp/abc.txt) is:
+      String fileName = getCurrentSource().getSingleFileMetadata().resourceId().toString();

Review comment:
       Hm, interesting point. I don't know to which extent size is a concern.
   Would the user need to join Uri + FileName and have to worry to know the correct separator for the two?
   Perhaps worth considering something along the lines of Filename and (optionally) FullFilename? 
   

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/Range.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import com.google.auto.value.AutoValue;
+
+@AutoValue
+public abstract class Range {
+  public abstract Long getRangeNum();

Review comment:
       What is this? starting offset?

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/ContextualTextIOSource.java
##########
@@ -259,7 +317,31 @@ private void decodeCurrentElement() throws IOException {
       if (startOfRecord == 0 && dataToDecode.startsWith(UTF8_BOM)) {
         dataToDecode = dataToDecode.substring(UTF8_BOM.size());
       }
-      currentValue = dataToDecode.toStringUtf8();
+
+      /////////////////////////////////////////////
+
+      //      Data of the Current Line
+      //      dataToDecode.toStringUtf8();
+
+      // The line num is:
+      Long lineUniqueLineNum = readerlineNum++;

Review comment:
       the naming is confusing here.
   `recordNum = totalRecordCount++` perhaps?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
##########
@@ -130,7 +130,7 @@ protected FileBasedSource(
    *
    * @throws IllegalArgumentException if this source is in {@link Mode#FILEPATTERN} mode.
    */
-  protected final MatchResult.Metadata getSingleFileMetadata() {
+  public final MatchResult.Metadata getSingleFileMetadata() {

Review comment:
       SGTM, different commit should be sufficient.

##########
File path: sdks/java/io/contextual-text-io/src/main/java/org/apache/beam/sdk/io/contextualtextio/RecordWithMetadata.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.contextualtextio;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+@Internal
+@Experimental(Experimental.Kind.SCHEMAS)
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class RecordWithMetadata {
+  public abstract Range getRange();

Review comment:
       Should this be user-visible (will it be useful?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org