You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/17 16:38:42 UTC

[GitHub] [beam] aromanenko-dev commented on a diff in pull request #17828: [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF

aromanenko-dev commented on code in PR #17828:
URL: https://github.com/apache/beam/pull/17828#discussion_r948150634


##########
sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Streaming sources for Spark {@link Receiver}. */

Review Comment:
   Please, add more details about this connector and examples of how to use it.



##########
sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOTest.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.util.List;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.runners.direct.DirectRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test class for {@link SparkReceiverIO}. */
+@RunWith(JUnit4.class)
+public class SparkReceiverIOTest {
+
+  @Test
+  public void testReadBuildsCorrectly() {
+    ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(CustomReceiverWithOffset.class).withConstructorArgs();
+    SerializableFunction<String, Long> offsetFn = Long::valueOf;
+    SerializableFunction<String, Instant> watermarkFn = Instant::parse;
+
+    SparkReceiverIO.Read<String> read =
+        SparkReceiverIO.<String>read()
+            .withValueClass(String.class)
+            .withGetOffsetFn(offsetFn)
+            .withWatermarkFn(watermarkFn)
+            .withSparkReceiverBuilder(receiverBuilder);
+
+    assertEquals(offsetFn, read.getGetOffsetFn());
+    assertEquals(receiverBuilder, read.getSparkReceiverBuilder());
+    assertEquals(String.class, read.getValueClass());
+  }
+
+  @Test
+  public void testReadObjectCreationFailsIfReceiverBuilderIsNull() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> SparkReceiverIO.<String>read().withSparkReceiverBuilder(null));
+  }
+
+  @Test
+  public void testReadObjectCreationFailsIfGetOffsetFnIsNull() {
+    assertThrows(
+        IllegalArgumentException.class, () -> SparkReceiverIO.<String>read().withGetOffsetFn(null));
+  }
+
+  @Test
+  public void testReadObjectCreationFailsIfWatermarkFnIsNull() {
+    assertThrows(
+        IllegalArgumentException.class, () -> SparkReceiverIO.<String>read().withWatermarkFn(null));
+  }
+
+  @Test
+  public void testReadObjectCreationFailsIfValueClassIsNull() {
+    assertThrows(
+        IllegalArgumentException.class, () -> SparkReceiverIO.<String>read().withValueClass(null));
+  }
+
+  @Test
+  public void testReadValidationFailsMissingReceiverBuilder() {
+    SparkReceiverIO.Read<String> read = SparkReceiverIO.read();
+    assertThrows(IllegalStateException.class, read::validateTransform);
+  }
+
+  @Test
+  public void testReadValidationFailsMissingSparkConsumer() {
+    ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(CustomReceiverWithOffset.class).withConstructorArgs();
+    SparkReceiverIO.Read<String> read =
+        SparkReceiverIO.<String>read().withSparkReceiverBuilder(receiverBuilder);
+    assertThrows(IllegalStateException.class, read::validateTransform);
+  }
+
+  @Test
+  public void testReadFromCustomReceiverWithOffset() {
+    DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
+    options.setBlockOnRun(false);
+    options.setRunner(DirectRunner.class);

Review Comment:
   No need to specify a runner - unit tests are running by default on DirectRunner



##########
sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOTest.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.util.List;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.runners.direct.DirectRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test class for {@link SparkReceiverIO}. */
+@RunWith(JUnit4.class)
+public class SparkReceiverIOTest {
+
+  @Test
+  public void testReadBuildsCorrectly() {
+    ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(CustomReceiverWithOffset.class).withConstructorArgs();
+    SerializableFunction<String, Long> offsetFn = Long::valueOf;
+    SerializableFunction<String, Instant> watermarkFn = Instant::parse;
+
+    SparkReceiverIO.Read<String> read =
+        SparkReceiverIO.<String>read()
+            .withValueClass(String.class)
+            .withGetOffsetFn(offsetFn)
+            .withWatermarkFn(watermarkFn)
+            .withSparkReceiverBuilder(receiverBuilder);
+
+    assertEquals(offsetFn, read.getGetOffsetFn());
+    assertEquals(receiverBuilder, read.getSparkReceiverBuilder());
+    assertEquals(String.class, read.getValueClass());
+  }
+
+  @Test
+  public void testReadObjectCreationFailsIfReceiverBuilderIsNull() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> SparkReceiverIO.<String>read().withSparkReceiverBuilder(null));
+  }
+
+  @Test
+  public void testReadObjectCreationFailsIfGetOffsetFnIsNull() {
+    assertThrows(
+        IllegalArgumentException.class, () -> SparkReceiverIO.<String>read().withGetOffsetFn(null));
+  }
+
+  @Test
+  public void testReadObjectCreationFailsIfWatermarkFnIsNull() {
+    assertThrows(
+        IllegalArgumentException.class, () -> SparkReceiverIO.<String>read().withWatermarkFn(null));
+  }
+
+  @Test
+  public void testReadObjectCreationFailsIfValueClassIsNull() {
+    assertThrows(
+        IllegalArgumentException.class, () -> SparkReceiverIO.<String>read().withValueClass(null));
+  }
+
+  @Test
+  public void testReadValidationFailsMissingReceiverBuilder() {
+    SparkReceiverIO.Read<String> read = SparkReceiverIO.read();
+    assertThrows(IllegalStateException.class, read::validateTransform);
+  }
+
+  @Test
+  public void testReadValidationFailsMissingSparkConsumer() {
+    ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(CustomReceiverWithOffset.class).withConstructorArgs();
+    SparkReceiverIO.Read<String> read =
+        SparkReceiverIO.<String>read().withSparkReceiverBuilder(receiverBuilder);
+    assertThrows(IllegalStateException.class, read::validateTransform);
+  }
+
+  @Test
+  public void testReadFromCustomReceiverWithOffset() {
+    DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
+    options.setBlockOnRun(false);
+    options.setRunner(DirectRunner.class);
+    Pipeline p = Pipeline.create(options);

Review Comment:
   Use `TestPipeline` for tests (see examples for other IO connectors).



##########
sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIO.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Streaming sources for Spark {@link Receiver}. */
+public class SparkReceiverIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIO.class);
+
+  public static <V> Read<V> read() {
+    return new AutoValue_SparkReceiverIO_Read.Builder<V>().build();
+  }
+
+  /** A {@link PTransform} to read from Spark {@link Receiver}. */
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  public abstract static class Read<V> extends PTransform<PBegin, PCollection<V>> {
+
+    abstract @Nullable ReceiverBuilder<V, ? extends Receiver<V>> getSparkReceiverBuilder();
+
+    abstract @Nullable Class<V> getValueClass();
+
+    abstract @Nullable SerializableFunction<V, Long> getGetOffsetFn();
+
+    abstract @Nullable SerializableFunction<V, Instant> getWatermarkFn();
+
+    abstract Builder<V> toBuilder();
+
+    @Experimental(Experimental.Kind.PORTABILITY)

Review Comment:
   Does this connector support portability?



##########
sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOTest.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.util.List;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.runners.direct.DirectRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test class for {@link SparkReceiverIO}. */
+@RunWith(JUnit4.class)
+public class SparkReceiverIOTest {
+
+  @Test
+  public void testReadBuildsCorrectly() {
+    ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(CustomReceiverWithOffset.class).withConstructorArgs();
+    SerializableFunction<String, Long> offsetFn = Long::valueOf;
+    SerializableFunction<String, Instant> watermarkFn = Instant::parse;
+
+    SparkReceiverIO.Read<String> read =
+        SparkReceiverIO.<String>read()
+            .withValueClass(String.class)
+            .withGetOffsetFn(offsetFn)
+            .withWatermarkFn(watermarkFn)
+            .withSparkReceiverBuilder(receiverBuilder);
+
+    assertEquals(offsetFn, read.getGetOffsetFn());
+    assertEquals(receiverBuilder, read.getSparkReceiverBuilder());
+    assertEquals(String.class, read.getValueClass());
+  }
+
+  @Test
+  public void testReadObjectCreationFailsIfReceiverBuilderIsNull() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> SparkReceiverIO.<String>read().withSparkReceiverBuilder(null));
+  }
+
+  @Test
+  public void testReadObjectCreationFailsIfGetOffsetFnIsNull() {
+    assertThrows(
+        IllegalArgumentException.class, () -> SparkReceiverIO.<String>read().withGetOffsetFn(null));
+  }
+
+  @Test
+  public void testReadObjectCreationFailsIfWatermarkFnIsNull() {
+    assertThrows(
+        IllegalArgumentException.class, () -> SparkReceiverIO.<String>read().withWatermarkFn(null));
+  }
+
+  @Test
+  public void testReadObjectCreationFailsIfValueClassIsNull() {
+    assertThrows(
+        IllegalArgumentException.class, () -> SparkReceiverIO.<String>read().withValueClass(null));
+  }
+
+  @Test
+  public void testReadValidationFailsMissingReceiverBuilder() {
+    SparkReceiverIO.Read<String> read = SparkReceiverIO.read();
+    assertThrows(IllegalStateException.class, read::validateTransform);
+  }
+
+  @Test
+  public void testReadValidationFailsMissingSparkConsumer() {
+    ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(CustomReceiverWithOffset.class).withConstructorArgs();
+    SparkReceiverIO.Read<String> read =
+        SparkReceiverIO.<String>read().withSparkReceiverBuilder(receiverBuilder);
+    assertThrows(IllegalStateException.class, read::validateTransform);
+  }
+
+  @Test
+  public void testReadFromCustomReceiverWithOffset() {
+    DirectOptions options = PipelineOptionsFactory.as(DirectOptions.class);
+    options.setBlockOnRun(false);
+    options.setRunner(DirectRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    ReceiverBuilder<String, CustomReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(CustomReceiverWithOffset.class).withConstructorArgs();
+    SparkReceiverIO.Read<String> reader =
+        SparkReceiverIO.<String>read()
+            .withValueClass(String.class)
+            .withGetOffsetFn(Long::valueOf)
+            .withWatermarkFn(Instant::parse)
+            .withSparkReceiverBuilder(receiverBuilder);
+
+    List<String> storedRecords = CustomReceiverWithOffset.getStoredRecords();
+    List<String> outputRecords = TestOutputDoFn.getRecords();
+    outputRecords.clear();
+
+    p.apply(reader).setCoder(StringUtf8Coder.of()).apply(ParDo.of(new TestOutputDoFn()));
+    p.run().waitUntilFinish(Duration.standardSeconds(15));
+
+    assertEquals(outputRecords, storedRecords);

Review Comment:
   Use 
   ```
   PCollection<String> output = p.apply(...)...
   
   PAssert.that(output).containsInAnyOrder(expectedRecords);
   ```



##########
sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link Receiver} that implements {@link HasOffset}. By default,
+ * a {@link WatermarkEstimators.Manual} watermark estimator is used to track watermark.
+ *
+ * <p>Initial range The initial range is {@code [0, Long.MAX_VALUE)}
+ *
+ * <p>Resume Processing Every time the sparkConsumer.hasRecords() returns false, {@link
+ * ReadFromSparkReceiverWithOffsetDoFn} will move to process the next element.
+ */
+@UnboundedPerElement
+public class ReadFromSparkReceiverWithOffsetDoFn<V> extends DoFn<byte[], V> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReadFromSparkReceiverWithOffsetDoFn.class);
+
+  /** Constant waiting time after the {@link Receiver} starts. Required to prepare for polling */
+  private static final int START_POLL_TIMEOUT_MS = 1000;
+
+  private final SerializableFunction<Instant, WatermarkEstimator<Instant>>
+      createWatermarkEstimatorFn;
+  private final SerializableFunction<V, Long> getOffsetFn;
+  private final SerializableFunction<V, Instant> getWatermarkFn;
+  private final ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder;
+
+  public ReadFromSparkReceiverWithOffsetDoFn(SparkReceiverIO.Read<V> transform) {
+    createWatermarkEstimatorFn = WatermarkEstimators.Manual::new;
+
+    ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder =
+        transform.getSparkReceiverBuilder();
+    checkStateNotNull(sparkReceiverBuilder, "Spark Receiver Builder can't be null!");
+    this.sparkReceiverBuilder = sparkReceiverBuilder;
+
+    SerializableFunction<V, Long> getOffsetFn = transform.getGetOffsetFn();
+    checkStateNotNull(getOffsetFn, "Get offset fn can't be null!");
+    this.getOffsetFn = getOffsetFn;
+
+    SerializableFunction<V, Instant> getWatermarkFn = transform.getWatermarkFn();
+    checkStateNotNull(getWatermarkFn, "Watermark fn can't be null!");
+    this.getWatermarkFn = getWatermarkFn;
+  }
+
+  @GetInitialRestriction
+  public OffsetRange initialRestriction(@Element byte[] element) {
+    return new OffsetRange(0, Long.MAX_VALUE);
+  }
+
+  @GetInitialWatermarkEstimatorState
+  public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
+    return currentElementTimestamp;
+  }
+
+  @NewWatermarkEstimator
+  public WatermarkEstimator<Instant> newWatermarkEstimator(
+      @WatermarkEstimatorState Instant watermarkEstimatorState) {
+    return createWatermarkEstimatorFn.apply(ensureTimestampWithinBounds(watermarkEstimatorState));
+  }
+
+  @GetSize
+  public double getSize(@Element byte[] element, @Restriction OffsetRange offsetRange) {
+    return restrictionTracker(element, offsetRange).getProgress().getWorkRemaining();
+    // Before processing elements, we don't have a good estimated size of records and offset gap.
+  }
+
+  @NewTracker
+  public OffsetRangeTracker restrictionTracker(
+      @Element byte[] element, @Restriction OffsetRange restriction) {
+    return new OffsetRangeTracker(restriction);
+  }
+
+  @GetRestrictionCoder
+  public Coder<OffsetRange> restrictionCoder() {
+    return new OffsetRange.Coder();
+  }
+
+  @Setup
+  public void setup() throws Exception {}

Review Comment:
   No need to specify the empty methods.



##########
sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/SparkConsumer.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import java.io.Serializable;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Interface for start/stop reading from some Spark {@link Receiver} into some place and poll from
+ * it.
+ */
+public interface SparkConsumer<V> extends Serializable {

Review Comment:
   Do we need this `public`?



##########
sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link Receiver} that implements {@link HasOffset}. By default,
+ * a {@link WatermarkEstimators.Manual} watermark estimator is used to track watermark.
+ *
+ * <p>Initial range The initial range is {@code [0, Long.MAX_VALUE)}
+ *
+ * <p>Resume Processing Every time the sparkConsumer.hasRecords() returns false, {@link
+ * ReadFromSparkReceiverWithOffsetDoFn} will move to process the next element.
+ */
+@UnboundedPerElement
+public class ReadFromSparkReceiverWithOffsetDoFn<V> extends DoFn<byte[], V> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReadFromSparkReceiverWithOffsetDoFn.class);
+
+  /** Constant waiting time after the {@link Receiver} starts. Required to prepare for polling */
+  private static final int START_POLL_TIMEOUT_MS = 1000;
+
+  private final SerializableFunction<Instant, WatermarkEstimator<Instant>>
+      createWatermarkEstimatorFn;
+  private final SerializableFunction<V, Long> getOffsetFn;
+  private final SerializableFunction<V, Instant> getWatermarkFn;
+  private final ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder;
+
+  public ReadFromSparkReceiverWithOffsetDoFn(SparkReceiverIO.Read<V> transform) {
+    createWatermarkEstimatorFn = WatermarkEstimators.Manual::new;
+
+    ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder =
+        transform.getSparkReceiverBuilder();
+    checkStateNotNull(sparkReceiverBuilder, "Spark Receiver Builder can't be null!");
+    this.sparkReceiverBuilder = sparkReceiverBuilder;
+
+    SerializableFunction<V, Long> getOffsetFn = transform.getGetOffsetFn();
+    checkStateNotNull(getOffsetFn, "Get offset fn can't be null!");
+    this.getOffsetFn = getOffsetFn;
+
+    SerializableFunction<V, Instant> getWatermarkFn = transform.getWatermarkFn();
+    checkStateNotNull(getWatermarkFn, "Watermark fn can't be null!");
+    this.getWatermarkFn = getWatermarkFn;
+  }
+
+  @GetInitialRestriction
+  public OffsetRange initialRestriction(@Element byte[] element) {
+    return new OffsetRange(0, Long.MAX_VALUE);
+  }
+
+  @GetInitialWatermarkEstimatorState
+  public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
+    return currentElementTimestamp;
+  }
+
+  @NewWatermarkEstimator
+  public WatermarkEstimator<Instant> newWatermarkEstimator(
+      @WatermarkEstimatorState Instant watermarkEstimatorState) {
+    return createWatermarkEstimatorFn.apply(ensureTimestampWithinBounds(watermarkEstimatorState));
+  }
+
+  @GetSize
+  public double getSize(@Element byte[] element, @Restriction OffsetRange offsetRange) {
+    return restrictionTracker(element, offsetRange).getProgress().getWorkRemaining();
+    // Before processing elements, we don't have a good estimated size of records and offset gap.
+  }
+
+  @NewTracker
+  public OffsetRangeTracker restrictionTracker(
+      @Element byte[] element, @Restriction OffsetRange restriction) {
+    return new OffsetRangeTracker(restriction);
+  }
+
+  @GetRestrictionCoder
+  public Coder<OffsetRange> restrictionCoder() {
+    return new OffsetRange.Coder();
+  }
+
+  @Setup
+  public void setup() throws Exception {}
+
+  @Teardown
+  public void teardown() throws Exception {}
+
+  @SuppressWarnings("unchecked")

Review Comment:
   Better to avoid `@SuppressWarnings` if possible



##########
sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link Receiver} that implements {@link HasOffset}. By default,
+ * a {@link WatermarkEstimators.Manual} watermark estimator is used to track watermark.
+ *
+ * <p>Initial range The initial range is {@code [0, Long.MAX_VALUE)}
+ *
+ * <p>Resume Processing Every time the sparkConsumer.hasRecords() returns false, {@link
+ * ReadFromSparkReceiverWithOffsetDoFn} will move to process the next element.
+ */
+@UnboundedPerElement
+public class ReadFromSparkReceiverWithOffsetDoFn<V> extends DoFn<byte[], V> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReadFromSparkReceiverWithOffsetDoFn.class);
+
+  /** Constant waiting time after the {@link Receiver} starts. Required to prepare for polling */
+  private static final int START_POLL_TIMEOUT_MS = 1000;
+
+  private final SerializableFunction<Instant, WatermarkEstimator<Instant>>
+      createWatermarkEstimatorFn;
+  private final SerializableFunction<V, Long> getOffsetFn;
+  private final SerializableFunction<V, Instant> getWatermarkFn;
+  private final ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder;
+
+  public ReadFromSparkReceiverWithOffsetDoFn(SparkReceiverIO.Read<V> transform) {
+    createWatermarkEstimatorFn = WatermarkEstimators.Manual::new;
+
+    ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder =
+        transform.getSparkReceiverBuilder();
+    checkStateNotNull(sparkReceiverBuilder, "Spark Receiver Builder can't be null!");
+    this.sparkReceiverBuilder = sparkReceiverBuilder;
+
+    SerializableFunction<V, Long> getOffsetFn = transform.getGetOffsetFn();
+    checkStateNotNull(getOffsetFn, "Get offset fn can't be null!");
+    this.getOffsetFn = getOffsetFn;
+
+    SerializableFunction<V, Instant> getWatermarkFn = transform.getWatermarkFn();
+    checkStateNotNull(getWatermarkFn, "Watermark fn can't be null!");
+    this.getWatermarkFn = getWatermarkFn;
+  }
+
+  @GetInitialRestriction
+  public OffsetRange initialRestriction(@Element byte[] element) {
+    return new OffsetRange(0, Long.MAX_VALUE);
+  }
+
+  @GetInitialWatermarkEstimatorState
+  public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
+    return currentElementTimestamp;
+  }
+
+  @NewWatermarkEstimator
+  public WatermarkEstimator<Instant> newWatermarkEstimator(
+      @WatermarkEstimatorState Instant watermarkEstimatorState) {
+    return createWatermarkEstimatorFn.apply(ensureTimestampWithinBounds(watermarkEstimatorState));
+  }
+
+  @GetSize
+  public double getSize(@Element byte[] element, @Restriction OffsetRange offsetRange) {
+    return restrictionTracker(element, offsetRange).getProgress().getWorkRemaining();
+    // Before processing elements, we don't have a good estimated size of records and offset gap.
+  }
+
+  @NewTracker
+  public OffsetRangeTracker restrictionTracker(
+      @Element byte[] element, @Restriction OffsetRange restriction) {
+    return new OffsetRangeTracker(restriction);
+  }
+
+  @GetRestrictionCoder
+  public Coder<OffsetRange> restrictionCoder() {
+    return new OffsetRange.Coder();
+  }
+
+  @Setup
+  public void setup() throws Exception {}
+
+  @Teardown
+  public void teardown() throws Exception {}
+
+  @SuppressWarnings("unchecked")
+  private static class SparkConsumerWithOffset<V> implements SparkConsumer<V> {
+    private final Queue<V> recordsQueue;
+    private @Nullable Receiver<V> sparkReceiver;
+    private final Long startOffset;
+
+    public SparkConsumerWithOffset(Long startOffset) {
+      this.startOffset = startOffset;
+      this.recordsQueue = new ConcurrentLinkedQueue<>();
+    }
+
+    @Override
+    public boolean hasRecords() {
+      return !recordsQueue.isEmpty();
+    }
+
+    @Override
+    public @Nullable V poll() {
+      return recordsQueue.poll();
+    }
+
+    @Override
+    public void start(Receiver<V> sparkReceiver) {
+      this.sparkReceiver = sparkReceiver;
+      try {
+        new WrappedSupervisor(
+            sparkReceiver,
+            new SparkConf(),
+            objects -> {
+              V record = (V) objects[0];
+              recordsQueue.offer(record);
+              return null;
+            });
+      } catch (Exception e) {
+        LOG.error("Can not init Spark Receiver!", e);
+      }
+      ((HasOffset) sparkReceiver).setStartOffset(startOffset);
+      sparkReceiver.supervisor().startReceiver();
+      try {
+        TimeUnit.MILLISECONDS.sleep(START_POLL_TIMEOUT_MS);
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted", e);
+      }
+    }
+
+    @Override
+    public void stop() {
+      if (sparkReceiver != null) {
+        sparkReceiver.stop("Stopped");

Review Comment:
   nit: `SparkReceiver is stopped.`



##########
sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link Receiver} that implements {@link HasOffset}. By default,
+ * a {@link WatermarkEstimators.Manual} watermark estimator is used to track watermark.
+ *
+ * <p>Initial range The initial range is {@code [0, Long.MAX_VALUE)}
+ *
+ * <p>Resume Processing Every time the sparkConsumer.hasRecords() returns false, {@link
+ * ReadFromSparkReceiverWithOffsetDoFn} will move to process the next element.
+ */
+@UnboundedPerElement
+public class ReadFromSparkReceiverWithOffsetDoFn<V> extends DoFn<byte[], V> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReadFromSparkReceiverWithOffsetDoFn.class);
+
+  /** Constant waiting time after the {@link Receiver} starts. Required to prepare for polling */
+  private static final int START_POLL_TIMEOUT_MS = 1000;
+
+  private final SerializableFunction<Instant, WatermarkEstimator<Instant>>
+      createWatermarkEstimatorFn;
+  private final SerializableFunction<V, Long> getOffsetFn;
+  private final SerializableFunction<V, Instant> getWatermarkFn;
+  private final ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder;
+
+  public ReadFromSparkReceiverWithOffsetDoFn(SparkReceiverIO.Read<V> transform) {
+    createWatermarkEstimatorFn = WatermarkEstimators.Manual::new;
+
+    ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder =
+        transform.getSparkReceiverBuilder();
+    checkStateNotNull(sparkReceiverBuilder, "Spark Receiver Builder can't be null!");
+    this.sparkReceiverBuilder = sparkReceiverBuilder;
+
+    SerializableFunction<V, Long> getOffsetFn = transform.getGetOffsetFn();
+    checkStateNotNull(getOffsetFn, "Get offset fn can't be null!");
+    this.getOffsetFn = getOffsetFn;
+
+    SerializableFunction<V, Instant> getWatermarkFn = transform.getWatermarkFn();
+    checkStateNotNull(getWatermarkFn, "Watermark fn can't be null!");
+    this.getWatermarkFn = getWatermarkFn;
+  }
+
+  @GetInitialRestriction
+  public OffsetRange initialRestriction(@Element byte[] element) {
+    return new OffsetRange(0, Long.MAX_VALUE);
+  }
+
+  @GetInitialWatermarkEstimatorState
+  public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
+    return currentElementTimestamp;
+  }
+
+  @NewWatermarkEstimator
+  public WatermarkEstimator<Instant> newWatermarkEstimator(
+      @WatermarkEstimatorState Instant watermarkEstimatorState) {
+    return createWatermarkEstimatorFn.apply(ensureTimestampWithinBounds(watermarkEstimatorState));
+  }
+
+  @GetSize
+  public double getSize(@Element byte[] element, @Restriction OffsetRange offsetRange) {
+    return restrictionTracker(element, offsetRange).getProgress().getWorkRemaining();
+    // Before processing elements, we don't have a good estimated size of records and offset gap.
+  }
+
+  @NewTracker
+  public OffsetRangeTracker restrictionTracker(
+      @Element byte[] element, @Restriction OffsetRange restriction) {
+    return new OffsetRangeTracker(restriction);
+  }
+
+  @GetRestrictionCoder
+  public Coder<OffsetRange> restrictionCoder() {
+    return new OffsetRange.Coder();
+  }
+
+  @Setup
+  public void setup() throws Exception {}
+
+  @Teardown
+  public void teardown() throws Exception {}
+
+  @SuppressWarnings("unchecked")
+  private static class SparkConsumerWithOffset<V> implements SparkConsumer<V> {
+    private final Queue<V> recordsQueue;
+    private @Nullable Receiver<V> sparkReceiver;
+    private final Long startOffset;
+
+    public SparkConsumerWithOffset(Long startOffset) {
+      this.startOffset = startOffset;
+      this.recordsQueue = new ConcurrentLinkedQueue<>();
+    }
+
+    @Override
+    public boolean hasRecords() {
+      return !recordsQueue.isEmpty();
+    }
+
+    @Override
+    public @Nullable V poll() {
+      return recordsQueue.poll();
+    }
+
+    @Override
+    public void start(Receiver<V> sparkReceiver) {
+      this.sparkReceiver = sparkReceiver;
+      try {
+        new WrappedSupervisor(
+            sparkReceiver,
+            new SparkConf(),
+            objects -> {
+              V record = (V) objects[0];
+              recordsQueue.offer(record);
+              return null;
+            });
+      } catch (Exception e) {
+        LOG.error("Can not init Spark Receiver!", e);
+      }
+      ((HasOffset) sparkReceiver).setStartOffset(startOffset);
+      sparkReceiver.supervisor().startReceiver();
+      try {
+        TimeUnit.MILLISECONDS.sleep(START_POLL_TIMEOUT_MS);
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted", e);

Review Comment:
   Please, add more detailed error message, like `SparkReceiver was interrupted because of <>`.



##########
sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link Receiver} that implements {@link HasOffset}. By default,
+ * a {@link WatermarkEstimators.Manual} watermark estimator is used to track watermark.
+ *
+ * <p>Initial range The initial range is {@code [0, Long.MAX_VALUE)}
+ *
+ * <p>Resume Processing Every time the sparkConsumer.hasRecords() returns false, {@link
+ * ReadFromSparkReceiverWithOffsetDoFn} will move to process the next element.
+ */
+@UnboundedPerElement
+public class ReadFromSparkReceiverWithOffsetDoFn<V> extends DoFn<byte[], V> {

Review Comment:
   Do we need this  class and its methods be `public`?



##########
sdks/java/io/sparkreceiver/src/main/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFn.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link Receiver} that implements {@link HasOffset}. By default,
+ * a {@link WatermarkEstimators.Manual} watermark estimator is used to track watermark.
+ *
+ * <p>Initial range The initial range is {@code [0, Long.MAX_VALUE)}
+ *
+ * <p>Resume Processing Every time the sparkConsumer.hasRecords() returns false, {@link
+ * ReadFromSparkReceiverWithOffsetDoFn} will move to process the next element.
+ */
+@UnboundedPerElement
+public class ReadFromSparkReceiverWithOffsetDoFn<V> extends DoFn<byte[], V> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReadFromSparkReceiverWithOffsetDoFn.class);
+
+  /** Constant waiting time after the {@link Receiver} starts. Required to prepare for polling */
+  private static final int START_POLL_TIMEOUT_MS = 1000;
+
+  private final SerializableFunction<Instant, WatermarkEstimator<Instant>>
+      createWatermarkEstimatorFn;
+  private final SerializableFunction<V, Long> getOffsetFn;
+  private final SerializableFunction<V, Instant> getWatermarkFn;
+  private final ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder;
+
+  public ReadFromSparkReceiverWithOffsetDoFn(SparkReceiverIO.Read<V> transform) {
+    createWatermarkEstimatorFn = WatermarkEstimators.Manual::new;
+
+    ReceiverBuilder<V, ? extends Receiver<V>> sparkReceiverBuilder =
+        transform.getSparkReceiverBuilder();
+    checkStateNotNull(sparkReceiverBuilder, "Spark Receiver Builder can't be null!");
+    this.sparkReceiverBuilder = sparkReceiverBuilder;
+
+    SerializableFunction<V, Long> getOffsetFn = transform.getGetOffsetFn();
+    checkStateNotNull(getOffsetFn, "Get offset fn can't be null!");
+    this.getOffsetFn = getOffsetFn;
+
+    SerializableFunction<V, Instant> getWatermarkFn = transform.getWatermarkFn();
+    checkStateNotNull(getWatermarkFn, "Watermark fn can't be null!");
+    this.getWatermarkFn = getWatermarkFn;
+  }
+
+  @GetInitialRestriction
+  public OffsetRange initialRestriction(@Element byte[] element) {
+    return new OffsetRange(0, Long.MAX_VALUE);
+  }
+
+  @GetInitialWatermarkEstimatorState
+  public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
+    return currentElementTimestamp;
+  }
+
+  @NewWatermarkEstimator
+  public WatermarkEstimator<Instant> newWatermarkEstimator(
+      @WatermarkEstimatorState Instant watermarkEstimatorState) {
+    return createWatermarkEstimatorFn.apply(ensureTimestampWithinBounds(watermarkEstimatorState));
+  }
+
+  @GetSize
+  public double getSize(@Element byte[] element, @Restriction OffsetRange offsetRange) {
+    return restrictionTracker(element, offsetRange).getProgress().getWorkRemaining();
+    // Before processing elements, we don't have a good estimated size of records and offset gap.
+  }
+
+  @NewTracker
+  public OffsetRangeTracker restrictionTracker(
+      @Element byte[] element, @Restriction OffsetRange restriction) {
+    return new OffsetRangeTracker(restriction);
+  }
+
+  @GetRestrictionCoder
+  public Coder<OffsetRange> restrictionCoder() {
+    return new OffsetRange.Coder();
+  }
+
+  @Setup
+  public void setup() throws Exception {}
+
+  @Teardown
+  public void teardown() throws Exception {}
+
+  @SuppressWarnings("unchecked")
+  private static class SparkConsumerWithOffset<V> implements SparkConsumer<V> {
+    private final Queue<V> recordsQueue;
+    private @Nullable Receiver<V> sparkReceiver;
+    private final Long startOffset;
+
+    public SparkConsumerWithOffset(Long startOffset) {

Review Comment:
   Please, restrict methods visibility where it's possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org