You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/20 14:23:52 UTC

[GitHub] [beam] Amar3tto opened a new pull request, #23305: [CdapIO] Add integration tests for SparkReceiverIO

Amar3tto opened a new pull request, #23305:
URL: https://github.com/apache/beam/pull/23305

   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1255192375

   @chamikaramj @aromanenko-dev
   Integration tests for SparkReceiverIO PR is ready for review
   Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1292107185

   Run Java SparkReceiverIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1295077199

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #23305:
URL: https://github.com/apache/beam/pull/23305#discussion_r993593498


##########
sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.common.IOITHelper;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
+import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * IO Integration test for {@link org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO}.
+ *
+ * <p>{@see https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests} for
+ * more details.
+ *
+ * <p>NOTE: This test sets retention policy of the messages so that all messages are retained in the
+ * topic so that we could read them back after writing.
+ */
+@RunWith(JUnit4.class)
+public class SparkReceiverIOIT {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIOIT.class);
+
+  private static final String READ_TIME_METRIC_NAME = "read_time";
+
+  private static final String RUN_TIME_METRIC_NAME = "run_time";
+
+  private static final String READ_ELEMENT_METRIC_NAME = "spark_read_element_count";
+
+  private static final String NAMESPACE = SparkReceiverIOIT.class.getName();
+
+  private static final String TEST_ID = UUID.randomUUID().toString();
+
+  private static final String TIMESTAMP = Timestamp.now().toString();
+
+  private static final String TEST_MESSAGE_PREFIX = "Test ";
+
+  private static Options options;
+
+  private static SyntheticSourceOptions sourceOptions;
+
+  private static GenericContainer<?> rabbitMqContainer;
+
+  private static InfluxDBSettings settings;
+
+  private static final ExperimentalOptions sdfPipelineOptions;
+
+  static {
+    sdfPipelineOptions = PipelineOptionsFactory.create().as(ExperimentalOptions.class);
+    sdfPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
+  }
+
+  @Rule public TestPipeline readPipeline = TestPipeline.fromOptions(sdfPipelineOptions);
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    options = IOITHelper.readIOTestPipelineOptions(Options.class);
+    sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
+    options.setRabbitMqBootstrapServerAddress(
+        "amqp://guest:guest@" + options.getRabbitMqBootstrapServerAddress());
+    if (options.isWithTestcontainers()) {
+      setupRabbitMqContainer();
+    } else {
+      settings =
+          InfluxDBSettings.builder()
+              .withHost(options.getInfluxHost())
+              .withDatabase(options.getInfluxDatabase())
+              .withMeasurement(options.getInfluxMeasurement())
+              .get();
+    }
+    clearRabbitMQ();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (rabbitMqContainer != null) {
+      rabbitMqContainer.stop();
+    }
+
+    clearRabbitMQ();
+  }
+
+  private static void setupRabbitMqContainer() {
+    rabbitMqContainer =
+        new RabbitMQContainer(
+                DockerImageName.parse("rabbitmq").withTag(options.getRabbitMqContainerVersion()))
+            .withExposedPorts(5672, 15672);
+    rabbitMqContainer.start();
+    options.setRabbitMqBootstrapServerAddress(
+        getBootstrapServers(
+            rabbitMqContainer.getHost(), rabbitMqContainer.getMappedPort(5672).toString()));
+  }
+
+  private static String getBootstrapServers(String host, String port) {
+    return String.format("amqp://guest:guest@%s:%s", host, port);
+  }
+
+  /** Pipeline options specific for this test. */
+  public interface Options extends IOTestPipelineOptions, StreamingOptions {
+
+    @Description("Options for synthetic source.")
+    @Validation.Required
+    @Default.String("{\"numRecords\": \"500\",\"keySizeBytes\": \"1\",\"valueSizeBytes\": \"90\"}")
+    String getSourceOptions();
+
+    void setSourceOptions(String sourceOptions);
+
+    @Description("RabbitMQ bootstrap server address")
+    @Default.String("amqp://guest:guest@localhost:5672")
+    String getRabbitMqBootstrapServerAddress();
+
+    void setRabbitMqBootstrapServerAddress(String address);
+
+    @Description("RabbitMQ stream")
+    @Default.String("rabbitMqTestStream")
+    String getStreamName();
+
+    void setStreamName(String streamName);
+
+    @Description("Whether to use testcontainers")
+    @Default.Boolean(true)
+    Boolean isWithTestcontainers();
+
+    void setWithTestcontainers(Boolean withTestcontainers);
+
+    @Description("RabbitMQ container version. Use when useTestcontainers is true")
+    @Nullable
+    @Default.String("3.9-alpine")
+    String getRabbitMqContainerVersion();
+
+    void setRabbitMqContainerVersion(String rabbitMqContainerVersion);
+
+    @Description("Time to wait for the events to be processed by the read pipeline (in seconds)")
+    @Default.Integer(50)
+    @Validation.Required
+    Integer getReadTimeout();
+
+    void setReadTimeout(Integer readTimeout);
+  }
+
+  private void writeToRabbitMq(final List<String> messages)
+      throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException,
+          TimeoutException {
+
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+    connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+    Map<String, Object> arguments = new HashMap<>();
+    arguments.put("x-queue-type", "stream");
+
+    try (Connection connection = connectionFactory.newConnection();
+        Channel channel = connection.createChannel()) {
+      channel.queueDeclare(options.getStreamName(), true, false, false, arguments);
+
+      messages.forEach(
+          message -> {
+            try {
+              channel.basicPublish(
+                  "",
+                  options.getStreamName(),
+                  MessageProperties.PERSISTENT_TEXT_PLAIN,
+                  message.getBytes(StandardCharsets.UTF_8));
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+  }
+
+  private SparkReceiverIO.Read<String> readFromRabbitMqWithOffset() {
+    final ReceiverBuilder<String, RabbitMqReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(RabbitMqReceiverWithOffset.class)
+            .withConstructorArgs(
+                options.getRabbitMqBootstrapServerAddress(),
+                options.getStreamName(),
+                sourceOptions.numRecords);
+
+    return SparkReceiverIO.<String>read()
+        .withGetOffsetFn(
+            rabbitMqMessage ->
+                Long.valueOf(rabbitMqMessage.substring(TEST_MESSAGE_PREFIX.length())))
+        .withSparkReceiverBuilder(receiverBuilder);
+  }
+
+  /**
+   * Since streams in RabbitMQ are durable by definition, we have to clean them up after test
+   * execution. The simplest way is to delete the whole stream after test execution.
+   */
+  private static void clearRabbitMQ() {
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+
+    try {
+      connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+      try (Connection connection = connectionFactory.newConnection();
+          Channel channel = connection.createChannel()) {
+        channel.queueDelete(options.getStreamName());
+      }
+    } catch (URISyntaxException
+        | NoSuchAlgorithmException
+        | KeyManagementException
+        | IOException
+        | TimeoutException e) {
+      LOG.error("Error during RabbitMQ clean up", e);
+    }
+  }
+
+  /** Function for counting processed pipeline elements. */
+  private static class CountingFn extends DoFn<String, Void> {
+
+    private final Counter elementCounter;
+
+    CountingFn(String namespace, String name) {
+      elementCounter = Metrics.counter(namespace, name);
+    }
+
+    @ProcessElement
+    public void processElement() {
+      elementCounter.inc(1L);
+    }
+  }
+
+  private void cancelIfTimeout(PipelineResult readResult, PipelineResult.State readState)
+      throws IOException {
+    if (readState == null) {
+      readResult.cancel();
+    }
+  }
+
+  private long readElementMetric(PipelineResult result) {
+    MetricsReader metricsReader = new MetricsReader(result, SparkReceiverIOIT.NAMESPACE);
+    return metricsReader.getCounterMetric(SparkReceiverIOIT.READ_ELEMENT_METRIC_NAME);
+  }
+
+  private Set<NamedTestResult> readMetrics(PipelineResult readResult) {
+    BiFunction<MetricsReader, String, NamedTestResult> supplier =
+        (reader, metricName) -> {
+          long start = reader.getStartTimeMetric(metricName);
+          long end = reader.getEndTimeMetric(metricName);
+          return NamedTestResult.create(TEST_ID, TIMESTAMP, metricName, (end - start) / 1e3);
+        };
+
+    NamedTestResult readTime =
+        supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME);
+    NamedTestResult runTime =
+        NamedTestResult.create(TEST_ID, TIMESTAMP, RUN_TIME_METRIC_NAME, readTime.getValue());
+
+    return ImmutableSet.of(readTime, runTime);
+  }
+
+  @Test
+  public void testSparkReceiverIOReadsInStreamingWithOffset() throws IOException {
+
+    final List<String> messages =

Review Comment:
   We have 600 000 records in PerfromanceTest job. I will run it again to be sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1282208925

   Run Python_PVR_Flink PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1306235934

   Hi, it looks like the new SparkReceiverIO performance test adds a persistent rabbitMQ server to the io-datastore k8s cluster in the test infrastructure. Then both rabbitmq and kafkaIO requests ~1 GB RAM and Kafka workload fails to initialize since Nov 3 (SparkReceiverIO performance test also fails since then).
   
   Can we make rabbitmq server deployment transcient and cleanup in integration test cleanup, as the kafka performance test is doing? If not we should bump the machine type for the k8s cluster. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1252534443

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @apilloud for label java.
   R: @Abacn for label build.
   R: @ahmedabu98 for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1281937499

   Run Java_Kafka_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #23305:
URL: https://github.com/apache/beam/pull/23305#discussion_r989689307


##########
sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/RabbitMqReceiverWithOffset.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Imitation of Spark {@link Receiver} for RabbitMQ that implements {@link HasOffset} interface.
+ * Used to test {@link SparkReceiverIO#read()}.
+ */
+public class RabbitMqReceiverWithOffset extends Receiver<String> implements HasOffset {

Review Comment:
   Changed to package-private



##########
sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.common.IOITHelper;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
+import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * IO Integration test for {@link org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO}.
+ *
+ * <p>{@see https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests} for
+ * more details.
+ *
+ * <p>NOTE: This test sets retention policy of the messages so that all messages are retained in the
+ * topic so that we could read them back after writing.
+ */
+@RunWith(JUnit4.class)
+public class SparkReceiverIOIT {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIOIT.class);
+
+  private static final String READ_TIME_METRIC_NAME = "read_time";
+
+  private static final String RUN_TIME_METRIC_NAME = "run_time";
+
+  private static final String READ_ELEMENT_METRIC_NAME = "spark_read_element_count";
+
+  private static final String NAMESPACE = SparkReceiverIOIT.class.getName();
+
+  private static final String TEST_ID = UUID.randomUUID().toString();
+
+  private static final String TIMESTAMP = Timestamp.now().toString();
+
+  private static final String TEST_MESSAGE_PREFIX = "Test ";
+
+  private static Options options;
+
+  private static SyntheticSourceOptions sourceOptions;
+
+  private static GenericContainer<?> rabbitMqContainer;
+
+  private static InfluxDBSettings settings;
+
+  private static final ExperimentalOptions sdfPipelineOptions;
+
+  static {
+    sdfPipelineOptions = PipelineOptionsFactory.create().as(ExperimentalOptions.class);
+    sdfPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
+  }
+
+  @Rule public TestPipeline readPipeline = TestPipeline.fromOptions(sdfPipelineOptions);
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    options = IOITHelper.readIOTestPipelineOptions(Options.class);
+    sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
+    options.setRabbitMqBootstrapServerAddress(
+        "amqp://guest:guest@" + options.getRabbitMqBootstrapServerAddress());
+    if (options.isWithTestcontainers()) {
+      setupRabbitMqContainer();
+    } else {
+      settings =
+          InfluxDBSettings.builder()
+              .withHost(options.getInfluxHost())
+              .withDatabase(options.getInfluxDatabase())
+              .withMeasurement(options.getInfluxMeasurement())
+              .get();
+    }
+    clearRabbitMQ();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (rabbitMqContainer != null) {
+      rabbitMqContainer.stop();
+    }
+
+    clearRabbitMQ();
+  }
+
+  private static void setupRabbitMqContainer() {
+    rabbitMqContainer =
+        new RabbitMQContainer(
+                DockerImageName.parse("rabbitmq").withTag(options.getRabbitMqContainerVersion()))
+            .withExposedPorts(5672, 15672);
+    rabbitMqContainer.start();
+    options.setRabbitMqBootstrapServerAddress(
+        getBootstrapServers(
+            rabbitMqContainer.getHost(), rabbitMqContainer.getMappedPort(5672).toString()));
+  }
+
+  private static String getBootstrapServers(String host, String port) {
+    return String.format("amqp://guest:guest@%s:%s", host, port);
+  }
+
+  /** Pipeline options specific for this test. */
+  public interface Options extends IOTestPipelineOptions, StreamingOptions {
+
+    @Description("Options for synthetic source.")
+    @Validation.Required
+    @Default.String("{\"numRecords\": \"500\",\"keySizeBytes\": \"1\",\"valueSizeBytes\": \"90\"}")
+    String getSourceOptions();
+
+    void setSourceOptions(String sourceOptions);
+
+    @Description("RabbitMQ bootstrap server address")
+    @Default.String("amqp://guest:guest@localhost:5672")
+    String getRabbitMqBootstrapServerAddress();
+
+    void setRabbitMqBootstrapServerAddress(String address);
+
+    @Description("RabbitMQ stream")
+    @Default.String("rabbitMqTestStream")
+    String getStreamName();
+
+    void setStreamName(String streamName);
+
+    @Description("Whether to use testcontainers")
+    @Default.Boolean(true)
+    Boolean isWithTestcontainers();
+
+    void setWithTestcontainers(Boolean withTestcontainers);
+
+    @Description("RabbitMQ container version. Use when useTestcontainers is true")
+    @Nullable
+    @Default.String("3.9-alpine")
+    String getRabbitMqContainerVersion();
+
+    void setRabbitMqContainerVersion(String rabbitMqContainerVersion);
+
+    @Description("Time to wait for the events to be processed by the read pipeline (in seconds)")
+    @Default.Integer(50)
+    @Validation.Required
+    Integer getReadTimeout();
+
+    void setReadTimeout(Integer readTimeout);
+  }
+
+  private void writeToRabbitMq(final List<String> messages)
+      throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException,
+          TimeoutException {
+
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+    connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+    Map<String, Object> arguments = new HashMap<>();
+    arguments.put("x-queue-type", "stream");
+
+    try (Connection connection = connectionFactory.newConnection();
+        Channel channel = connection.createChannel()) {
+      channel.queueDeclare(options.getStreamName(), true, false, false, arguments);
+
+      messages.forEach(
+          message -> {
+            try {
+              channel.basicPublish(
+                  "",
+                  options.getStreamName(),
+                  MessageProperties.PERSISTENT_TEXT_PLAIN,
+                  message.getBytes(StandardCharsets.UTF_8));
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+  }
+
+  private SparkReceiverIO.Read<String> readFromRabbitMqWithOffset() {
+    final ReceiverBuilder<String, RabbitMqReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(RabbitMqReceiverWithOffset.class)
+            .withConstructorArgs(
+                options.getRabbitMqBootstrapServerAddress(),
+                options.getStreamName(),
+                sourceOptions.numRecords);
+
+    return SparkReceiverIO.<String>read()
+        .withGetOffsetFn(
+            rabbitMqMessage ->
+                Long.valueOf(rabbitMqMessage.substring(TEST_MESSAGE_PREFIX.length())))
+        .withSparkReceiverBuilder(receiverBuilder);
+  }
+
+  /**
+   * Since streams in RabbitMQ are durable by definition, we have to clean them up after test
+   * execution. The simplest way is to delete the whole stream after test execution.
+   */
+  private static void clearRabbitMQ() {
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+
+    try {
+      connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+      try (Connection connection = connectionFactory.newConnection();
+          Channel channel = connection.createChannel()) {
+        channel.queueDelete(options.getStreamName());
+      }
+    } catch (URISyntaxException
+        | NoSuchAlgorithmException
+        | KeyManagementException
+        | IOException
+        | TimeoutException e) {
+      LOG.error("Error during RabbitMQ clean up", e);
+    }
+  }
+
+  /** Function for counting processed pipeline elements. */
+  private static class CountingFn extends DoFn<String, Void> {
+
+    private final Counter elementCounter;
+
+    CountingFn(String namespace, String name) {
+      elementCounter = Metrics.counter(namespace, name);
+    }
+
+    @ProcessElement
+    public void processElement() {
+      elementCounter.inc(1L);
+    }
+  }
+
+  private void cancelIfTimeout(PipelineResult readResult, PipelineResult.State readState)
+      throws IOException {
+    if (readState == null) {
+      readResult.cancel();
+    }
+  }
+
+  private long readElementMetric(PipelineResult result) {
+    MetricsReader metricsReader = new MetricsReader(result, SparkReceiverIOIT.NAMESPACE);
+    return metricsReader.getCounterMetric(SparkReceiverIOIT.READ_ELEMENT_METRIC_NAME);
+  }
+
+  private Set<NamedTestResult> readMetrics(PipelineResult readResult) {
+    BiFunction<MetricsReader, String, NamedTestResult> supplier =
+        (reader, metricName) -> {
+          long start = reader.getStartTimeMetric(metricName);
+          long end = reader.getEndTimeMetric(metricName);
+          return NamedTestResult.create(TEST_ID, TIMESTAMP, metricName, (end - start) / 1e3);
+        };
+
+    NamedTestResult readTime =
+        supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME);
+    NamedTestResult runTime =
+        NamedTestResult.create(TEST_ID, TIMESTAMP, RUN_TIME_METRIC_NAME, readTime.getValue());
+
+    return ImmutableSet.of(readTime, runTime);
+  }
+
+  @Test
+  public void testSparkReceiverIOReadsInStreamingWithOffset() throws IOException {
+
+    final List<String> messages =
+        LongStream.range(0, sourceOptions.numRecords)
+            .mapToObj(number -> TEST_MESSAGE_PREFIX + number)
+            .collect(Collectors.toList());
+
+    try {
+      writeToRabbitMq(messages);
+    } catch (Exception e) {
+      LOG.error("Can not write to rabbit {}", e.getMessage());

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1295009472

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1296601990

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1281988049

   Run Java SparkReceiverIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1281938012

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1253871428

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1276306316

   @aromanenko-dev thanks for the review. All your comments have been addressed. 
   Can we consider this PR ready for merge?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1282207542

   Run Java SparkReceiverIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #23305:
URL: https://github.com/apache/beam/pull/23305#discussion_r989690341


##########
sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.common.IOITHelper;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
+import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * IO Integration test for {@link org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO}.
+ *
+ * <p>{@see https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests} for
+ * more details.
+ *
+ * <p>NOTE: This test sets retention policy of the messages so that all messages are retained in the
+ * topic so that we could read them back after writing.
+ */
+@RunWith(JUnit4.class)
+public class SparkReceiverIOIT {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIOIT.class);
+
+  private static final String READ_TIME_METRIC_NAME = "read_time";
+
+  private static final String RUN_TIME_METRIC_NAME = "run_time";
+
+  private static final String READ_ELEMENT_METRIC_NAME = "spark_read_element_count";
+
+  private static final String NAMESPACE = SparkReceiverIOIT.class.getName();
+
+  private static final String TEST_ID = UUID.randomUUID().toString();
+
+  private static final String TIMESTAMP = Timestamp.now().toString();
+
+  private static final String TEST_MESSAGE_PREFIX = "Test ";
+
+  private static Options options;
+
+  private static SyntheticSourceOptions sourceOptions;
+
+  private static GenericContainer<?> rabbitMqContainer;
+
+  private static InfluxDBSettings settings;
+
+  private static final ExperimentalOptions sdfPipelineOptions;
+
+  static {
+    sdfPipelineOptions = PipelineOptionsFactory.create().as(ExperimentalOptions.class);
+    sdfPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
+  }
+
+  @Rule public TestPipeline readPipeline = TestPipeline.fromOptions(sdfPipelineOptions);
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    options = IOITHelper.readIOTestPipelineOptions(Options.class);
+    sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
+    options.setRabbitMqBootstrapServerAddress(
+        "amqp://guest:guest@" + options.getRabbitMqBootstrapServerAddress());
+    if (options.isWithTestcontainers()) {
+      setupRabbitMqContainer();
+    } else {
+      settings =
+          InfluxDBSettings.builder()
+              .withHost(options.getInfluxHost())
+              .withDatabase(options.getInfluxDatabase())
+              .withMeasurement(options.getInfluxMeasurement())
+              .get();
+    }
+    clearRabbitMQ();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (rabbitMqContainer != null) {
+      rabbitMqContainer.stop();
+    }
+
+    clearRabbitMQ();
+  }
+
+  private static void setupRabbitMqContainer() {
+    rabbitMqContainer =
+        new RabbitMQContainer(
+                DockerImageName.parse("rabbitmq").withTag(options.getRabbitMqContainerVersion()))
+            .withExposedPorts(5672, 15672);
+    rabbitMqContainer.start();
+    options.setRabbitMqBootstrapServerAddress(
+        getBootstrapServers(
+            rabbitMqContainer.getHost(), rabbitMqContainer.getMappedPort(5672).toString()));
+  }
+
+  private static String getBootstrapServers(String host, String port) {
+    return String.format("amqp://guest:guest@%s:%s", host, port);
+  }
+
+  /** Pipeline options specific for this test. */
+  public interface Options extends IOTestPipelineOptions, StreamingOptions {
+
+    @Description("Options for synthetic source.")
+    @Validation.Required
+    @Default.String("{\"numRecords\": \"500\",\"keySizeBytes\": \"1\",\"valueSizeBytes\": \"90\"}")
+    String getSourceOptions();
+
+    void setSourceOptions(String sourceOptions);
+
+    @Description("RabbitMQ bootstrap server address")
+    @Default.String("amqp://guest:guest@localhost:5672")
+    String getRabbitMqBootstrapServerAddress();
+
+    void setRabbitMqBootstrapServerAddress(String address);
+
+    @Description("RabbitMQ stream")
+    @Default.String("rabbitMqTestStream")
+    String getStreamName();
+
+    void setStreamName(String streamName);
+
+    @Description("Whether to use testcontainers")
+    @Default.Boolean(true)
+    Boolean isWithTestcontainers();
+
+    void setWithTestcontainers(Boolean withTestcontainers);
+
+    @Description("RabbitMQ container version. Use when useTestcontainers is true")
+    @Nullable
+    @Default.String("3.9-alpine")
+    String getRabbitMqContainerVersion();
+
+    void setRabbitMqContainerVersion(String rabbitMqContainerVersion);
+
+    @Description("Time to wait for the events to be processed by the read pipeline (in seconds)")
+    @Default.Integer(50)
+    @Validation.Required
+    Integer getReadTimeout();
+
+    void setReadTimeout(Integer readTimeout);
+  }
+
+  private void writeToRabbitMq(final List<String> messages)
+      throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException,
+          TimeoutException {
+
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+    connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+    Map<String, Object> arguments = new HashMap<>();
+    arguments.put("x-queue-type", "stream");
+
+    try (Connection connection = connectionFactory.newConnection();
+        Channel channel = connection.createChannel()) {
+      channel.queueDeclare(options.getStreamName(), true, false, false, arguments);
+
+      messages.forEach(
+          message -> {
+            try {
+              channel.basicPublish(
+                  "",
+                  options.getStreamName(),
+                  MessageProperties.PERSISTENT_TEXT_PLAIN,
+                  message.getBytes(StandardCharsets.UTF_8));
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+  }
+
+  private SparkReceiverIO.Read<String> readFromRabbitMqWithOffset() {
+    final ReceiverBuilder<String, RabbitMqReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(RabbitMqReceiverWithOffset.class)
+            .withConstructorArgs(
+                options.getRabbitMqBootstrapServerAddress(),
+                options.getStreamName(),
+                sourceOptions.numRecords);
+
+    return SparkReceiverIO.<String>read()
+        .withGetOffsetFn(
+            rabbitMqMessage ->
+                Long.valueOf(rabbitMqMessage.substring(TEST_MESSAGE_PREFIX.length())))
+        .withSparkReceiverBuilder(receiverBuilder);
+  }
+
+  /**
+   * Since streams in RabbitMQ are durable by definition, we have to clean them up after test
+   * execution. The simplest way is to delete the whole stream after test execution.
+   */
+  private static void clearRabbitMQ() {
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+
+    try {
+      connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+      try (Connection connection = connectionFactory.newConnection();
+          Channel channel = connection.createChannel()) {
+        channel.queueDelete(options.getStreamName());
+      }
+    } catch (URISyntaxException
+        | NoSuchAlgorithmException
+        | KeyManagementException
+        | IOException
+        | TimeoutException e) {
+      LOG.error("Error during RabbitMQ clean up", e);
+    }
+  }
+
+  /** Function for counting processed pipeline elements. */
+  private static class CountingFn extends DoFn<String, Void> {
+
+    private final Counter elementCounter;
+
+    CountingFn(String namespace, String name) {
+      elementCounter = Metrics.counter(namespace, name);
+    }
+
+    @ProcessElement
+    public void processElement() {
+      elementCounter.inc(1L);
+    }
+  }
+
+  private void cancelIfTimeout(PipelineResult readResult, PipelineResult.State readState)
+      throws IOException {
+    if (readState == null) {
+      readResult.cancel();
+    }
+  }
+
+  private long readElementMetric(PipelineResult result) {
+    MetricsReader metricsReader = new MetricsReader(result, SparkReceiverIOIT.NAMESPACE);
+    return metricsReader.getCounterMetric(SparkReceiverIOIT.READ_ELEMENT_METRIC_NAME);
+  }
+
+  private Set<NamedTestResult> readMetrics(PipelineResult readResult) {
+    BiFunction<MetricsReader, String, NamedTestResult> supplier =
+        (reader, metricName) -> {
+          long start = reader.getStartTimeMetric(metricName);
+          long end = reader.getEndTimeMetric(metricName);
+          return NamedTestResult.create(TEST_ID, TIMESTAMP, metricName, (end - start) / 1e3);
+        };
+
+    NamedTestResult readTime =
+        supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME);
+    NamedTestResult runTime =
+        NamedTestResult.create(TEST_ID, TIMESTAMP, RUN_TIME_METRIC_NAME, readTime.getValue());
+
+    return ImmutableSet.of(readTime, runTime);
+  }
+
+  @Test
+  public void testSparkReceiverIOReadsInStreamingWithOffset() throws IOException {
+
+    final List<String> messages =
+        LongStream.range(0, sourceOptions.numRecords)
+            .mapToObj(number -> TEST_MESSAGE_PREFIX + number)
+            .collect(Collectors.toList());
+
+    try {
+      writeToRabbitMq(messages);
+    } catch (Exception e) {
+      LOG.error("Can not write to rabbit {}", e.getMessage());
+    }
+    LOG.info(sourceOptions.numRecords + " records were successfully written to RabbitMQ");
+
+    // Use streaming pipeline to read RabbitMQ records.
+    readPipeline.getOptions().as(Options.class).setStreaming(true);
+    readPipeline
+        .apply("Read from unbounded RabbitMq", readFromRabbitMqWithOffset())
+        .setCoder(StringUtf8Coder.of())
+        .apply(ParDo.of(new TestOutputDoFn()))
+        .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
+        .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
+
+    TestOutputDoFn.EXPECTED_RECORDS.addAll(messages);
+
+    final PipelineResult readResult = readPipeline.run();
+    final PipelineResult.State readState =
+        readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    cancelIfTimeout(readResult, readState);
+
+    assertEquals(sourceOptions.numRecords, readElementMetric(readResult));
+
+    if (!options.isWithTestcontainers()) {
+      Set<NamedTestResult> metrics = readMetrics(readResult);
+      IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings);
+    }
+  }
+
+  /** {@link DoFn} that throws {@code RuntimeException} if receives unexpected element. */
+  private static class TestOutputDoFn extends DoFn<String, String> {
+    private static final Set<String> EXPECTED_RECORDS = new HashSet<>();
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<String> outputReceiver) {
+      if (!EXPECTED_RECORDS.contains(element)) {

Review Comment:
   Removed TestOutputFn, discussed this pattern and came to the conclusion that it will not work in general.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1293453974

   Run Java_Neo4j_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1293418997

   Run Java SparkReceiverIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1277767259

   Run Seed Job


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1282208183

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1280841785

   Run Seed Job


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #23305:
URL: https://github.com/apache/beam/pull/23305#discussion_r989203309


##########
sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.common.IOITHelper;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
+import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * IO Integration test for {@link org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO}.
+ *
+ * <p>{@see https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests} for
+ * more details.
+ *
+ * <p>NOTE: This test sets retention policy of the messages so that all messages are retained in the
+ * topic so that we could read them back after writing.
+ */
+@RunWith(JUnit4.class)
+public class SparkReceiverIOIT {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIOIT.class);
+
+  private static final String READ_TIME_METRIC_NAME = "read_time";
+
+  private static final String RUN_TIME_METRIC_NAME = "run_time";
+
+  private static final String READ_ELEMENT_METRIC_NAME = "spark_read_element_count";
+
+  private static final String NAMESPACE = SparkReceiverIOIT.class.getName();
+
+  private static final String TEST_ID = UUID.randomUUID().toString();
+
+  private static final String TIMESTAMP = Timestamp.now().toString();
+
+  private static final String TEST_MESSAGE_PREFIX = "Test ";
+
+  private static Options options;
+
+  private static SyntheticSourceOptions sourceOptions;
+
+  private static GenericContainer<?> rabbitMqContainer;
+
+  private static InfluxDBSettings settings;
+
+  private static final ExperimentalOptions sdfPipelineOptions;
+
+  static {
+    sdfPipelineOptions = PipelineOptionsFactory.create().as(ExperimentalOptions.class);
+    sdfPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
+  }
+
+  @Rule public TestPipeline readPipeline = TestPipeline.fromOptions(sdfPipelineOptions);
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    options = IOITHelper.readIOTestPipelineOptions(Options.class);
+    sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
+    options.setRabbitMqBootstrapServerAddress(
+        "amqp://guest:guest@" + options.getRabbitMqBootstrapServerAddress());
+    if (options.isWithTestcontainers()) {
+      setupRabbitMqContainer();
+    } else {
+      settings =
+          InfluxDBSettings.builder()
+              .withHost(options.getInfluxHost())
+              .withDatabase(options.getInfluxDatabase())
+              .withMeasurement(options.getInfluxMeasurement())
+              .get();
+    }
+    clearRabbitMQ();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (rabbitMqContainer != null) {
+      rabbitMqContainer.stop();
+    }
+
+    clearRabbitMQ();
+  }
+
+  private static void setupRabbitMqContainer() {
+    rabbitMqContainer =
+        new RabbitMQContainer(
+                DockerImageName.parse("rabbitmq").withTag(options.getRabbitMqContainerVersion()))
+            .withExposedPorts(5672, 15672);
+    rabbitMqContainer.start();
+    options.setRabbitMqBootstrapServerAddress(
+        getBootstrapServers(
+            rabbitMqContainer.getHost(), rabbitMqContainer.getMappedPort(5672).toString()));
+  }
+
+  private static String getBootstrapServers(String host, String port) {
+    return String.format("amqp://guest:guest@%s:%s", host, port);
+  }
+
+  /** Pipeline options specific for this test. */
+  public interface Options extends IOTestPipelineOptions, StreamingOptions {
+
+    @Description("Options for synthetic source.")
+    @Validation.Required
+    @Default.String("{\"numRecords\": \"500\",\"keySizeBytes\": \"1\",\"valueSizeBytes\": \"90\"}")
+    String getSourceOptions();
+
+    void setSourceOptions(String sourceOptions);
+
+    @Description("RabbitMQ bootstrap server address")
+    @Default.String("amqp://guest:guest@localhost:5672")
+    String getRabbitMqBootstrapServerAddress();
+
+    void setRabbitMqBootstrapServerAddress(String address);
+
+    @Description("RabbitMQ stream")
+    @Default.String("rabbitMqTestStream")
+    String getStreamName();
+
+    void setStreamName(String streamName);
+
+    @Description("Whether to use testcontainers")
+    @Default.Boolean(true)
+    Boolean isWithTestcontainers();
+
+    void setWithTestcontainers(Boolean withTestcontainers);
+
+    @Description("RabbitMQ container version. Use when useTestcontainers is true")
+    @Nullable
+    @Default.String("3.9-alpine")
+    String getRabbitMqContainerVersion();
+
+    void setRabbitMqContainerVersion(String rabbitMqContainerVersion);
+
+    @Description("Time to wait for the events to be processed by the read pipeline (in seconds)")
+    @Default.Integer(50)
+    @Validation.Required
+    Integer getReadTimeout();
+
+    void setReadTimeout(Integer readTimeout);
+  }
+
+  private void writeToRabbitMq(final List<String> messages)
+      throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException,
+          TimeoutException {
+
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+    connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+    Map<String, Object> arguments = new HashMap<>();
+    arguments.put("x-queue-type", "stream");
+
+    try (Connection connection = connectionFactory.newConnection();
+        Channel channel = connection.createChannel()) {
+      channel.queueDeclare(options.getStreamName(), true, false, false, arguments);
+
+      messages.forEach(
+          message -> {
+            try {
+              channel.basicPublish(
+                  "",
+                  options.getStreamName(),
+                  MessageProperties.PERSISTENT_TEXT_PLAIN,
+                  message.getBytes(StandardCharsets.UTF_8));
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+  }
+
+  private SparkReceiverIO.Read<String> readFromRabbitMqWithOffset() {
+    final ReceiverBuilder<String, RabbitMqReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(RabbitMqReceiverWithOffset.class)
+            .withConstructorArgs(
+                options.getRabbitMqBootstrapServerAddress(),
+                options.getStreamName(),
+                sourceOptions.numRecords);
+
+    return SparkReceiverIO.<String>read()
+        .withGetOffsetFn(
+            rabbitMqMessage ->
+                Long.valueOf(rabbitMqMessage.substring(TEST_MESSAGE_PREFIX.length())))
+        .withSparkReceiverBuilder(receiverBuilder);
+  }
+
+  /**
+   * Since streams in RabbitMQ are durable by definition, we have to clean them up after test
+   * execution. The simplest way is to delete the whole stream after test execution.
+   */
+  private static void clearRabbitMQ() {
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+
+    try {
+      connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+      try (Connection connection = connectionFactory.newConnection();
+          Channel channel = connection.createChannel()) {
+        channel.queueDelete(options.getStreamName());
+      }
+    } catch (URISyntaxException
+        | NoSuchAlgorithmException
+        | KeyManagementException
+        | IOException
+        | TimeoutException e) {
+      LOG.error("Error during RabbitMQ clean up", e);
+    }
+  }
+
+  /** Function for counting processed pipeline elements. */
+  private static class CountingFn extends DoFn<String, Void> {
+
+    private final Counter elementCounter;
+
+    CountingFn(String namespace, String name) {
+      elementCounter = Metrics.counter(namespace, name);
+    }
+
+    @ProcessElement
+    public void processElement() {
+      elementCounter.inc(1L);
+    }
+  }
+
+  private void cancelIfTimeout(PipelineResult readResult, PipelineResult.State readState)
+      throws IOException {
+    if (readState == null) {
+      readResult.cancel();
+    }
+  }
+
+  private long readElementMetric(PipelineResult result) {
+    MetricsReader metricsReader = new MetricsReader(result, SparkReceiverIOIT.NAMESPACE);
+    return metricsReader.getCounterMetric(SparkReceiverIOIT.READ_ELEMENT_METRIC_NAME);
+  }
+
+  private Set<NamedTestResult> readMetrics(PipelineResult readResult) {
+    BiFunction<MetricsReader, String, NamedTestResult> supplier =
+        (reader, metricName) -> {
+          long start = reader.getStartTimeMetric(metricName);
+          long end = reader.getEndTimeMetric(metricName);
+          return NamedTestResult.create(TEST_ID, TIMESTAMP, metricName, (end - start) / 1e3);
+        };
+
+    NamedTestResult readTime =
+        supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME);
+    NamedTestResult runTime =
+        NamedTestResult.create(TEST_ID, TIMESTAMP, RUN_TIME_METRIC_NAME, readTime.getValue());
+
+    return ImmutableSet.of(readTime, runTime);
+  }
+
+  @Test
+  public void testSparkReceiverIOReadsInStreamingWithOffset() throws IOException {
+
+    final List<String> messages =

Review Comment:
   Can we expect OOM exception here for large number of records?



##########
.test-infra/jenkins/job_PerformanceTests_SparkReceiverIO_IT.groovy:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as common
+import Kubernetes
+import InfluxDBCredentialsHelper
+
+String jobName = "beam_PerformanceTests_SparkReceiver_IO"
+
+/**
+ * This job runs the SparkReceiver IO performance tests.
+ It runs on a RabbitMQ cluster that is build by applying the folder .test-infra/kubernetes/rabbit,
+ in an existing kubernetes cluster (DEFAULT_CLUSTER in Kubernetes.groovy).
+ The services created to run this test are:
+ Pods: 1 RabbitMq pods.
+ Services: 1 broker
+ When the performance tests finish all resources are cleaned up by a postBuild step in Kubernetes.groovy
+ */
+job(jobName) {
+  common.setTopLevelMainJobProperties(delegate, 'master', 120)
+  common.setAutoJob(delegate, 'H H/6 * * *')
+  common.enablePhraseTriggeringFromPullRequest(
+      delegate,
+      'Java SparkReceiverIO Performance Test',
+      'Run Java SparkReceiverIO Performance Test')
+  InfluxDBCredentialsHelper.useCredentials(delegate)
+
+  String namespace = common.getKubernetesNamespace(jobName)
+  String kubeconfig = common.getKubeconfigLocationForNamespace(namespace)
+  Kubernetes k8s = Kubernetes.create(delegate, kubeconfig, namespace)
+
+  k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/rabbit/rabbitmq.yaml"))
+  String rabbitMqHostName = "LOAD_BALANCER_IP"
+  k8s.loadBalancerIP("rabbit", rabbitMqHostName)
+
+  Map pipelineOptions = [
+    tempRoot                      : 'gs://temp-storage-for-perf-tests',
+    project                       : 'apache-beam-testing',
+    runner                        : 'DataflowRunner',
+    sourceOptions                 : """
+                                     {
+                                       "numRecords": "600000",
+                                       "keySizeBytes": "1",
+                                       "valueSizeBytes": "90"
+                                     }
+                                   """.trim().replaceAll("\\s", ""),
+    bigQueryDataset               : 'beam_performance',
+    bigQueryTable                 : 'sparkreceiverioit_results',
+    influxMeasurement             : 'sparkreceiverioit_results',
+    influxDatabase                : InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+    influxHost                    : InfluxDBCredentialsHelper.InfluxDBHostUrl,
+    rabbitMqBootstrapServerAddress: 'amqp://guest:guest@' + rabbitMqHostName + ':5672',
+    streamName                    : 'rabbitMqTestStream',
+    readTimeout                   : '900',
+    numWorkers                    : '5',
+    autoscalingAlgorithm          : 'NONE'
+  ]
+
+  steps {
+    gradle {
+      rootBuildScriptDir(common.checkoutDir)
+      common.setGradleSwitches(delegate)
+      switches("--info")
+      switches("-DintegrationTestPipelineOptions=\'${common.joinPipelineOptions(pipelineOptions)}\'")
+      switches("-DintegrationTestRunner=dataflow")
+      tasks(":sdks:java:io:jdbc:integrationTest --tests org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIOIT.testSparkReceiverIOReadsInStreamingWithOffset\"")

Review Comment:
   nit: typo `:sdks:java:io:jdbc` - > `:sdks:java:io:sparkreceiver`



##########
sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.common.IOITHelper;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
+import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * IO Integration test for {@link org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO}.
+ *
+ * <p>{@see https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests} for
+ * more details.
+ *
+ * <p>NOTE: This test sets retention policy of the messages so that all messages are retained in the
+ * topic so that we could read them back after writing.
+ */
+@RunWith(JUnit4.class)
+public class SparkReceiverIOIT {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIOIT.class);
+
+  private static final String READ_TIME_METRIC_NAME = "read_time";
+
+  private static final String RUN_TIME_METRIC_NAME = "run_time";
+
+  private static final String READ_ELEMENT_METRIC_NAME = "spark_read_element_count";
+
+  private static final String NAMESPACE = SparkReceiverIOIT.class.getName();
+
+  private static final String TEST_ID = UUID.randomUUID().toString();
+
+  private static final String TIMESTAMP = Timestamp.now().toString();
+
+  private static final String TEST_MESSAGE_PREFIX = "Test ";
+
+  private static Options options;
+
+  private static SyntheticSourceOptions sourceOptions;
+
+  private static GenericContainer<?> rabbitMqContainer;
+
+  private static InfluxDBSettings settings;
+
+  private static final ExperimentalOptions sdfPipelineOptions;
+
+  static {
+    sdfPipelineOptions = PipelineOptionsFactory.create().as(ExperimentalOptions.class);
+    sdfPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
+  }
+
+  @Rule public TestPipeline readPipeline = TestPipeline.fromOptions(sdfPipelineOptions);
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    options = IOITHelper.readIOTestPipelineOptions(Options.class);
+    sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
+    options.setRabbitMqBootstrapServerAddress(
+        "amqp://guest:guest@" + options.getRabbitMqBootstrapServerAddress());
+    if (options.isWithTestcontainers()) {
+      setupRabbitMqContainer();
+    } else {
+      settings =
+          InfluxDBSettings.builder()
+              .withHost(options.getInfluxHost())
+              .withDatabase(options.getInfluxDatabase())
+              .withMeasurement(options.getInfluxMeasurement())
+              .get();
+    }
+    clearRabbitMQ();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (rabbitMqContainer != null) {
+      rabbitMqContainer.stop();
+    }
+
+    clearRabbitMQ();
+  }
+
+  private static void setupRabbitMqContainer() {
+    rabbitMqContainer =
+        new RabbitMQContainer(
+                DockerImageName.parse("rabbitmq").withTag(options.getRabbitMqContainerVersion()))
+            .withExposedPorts(5672, 15672);
+    rabbitMqContainer.start();
+    options.setRabbitMqBootstrapServerAddress(
+        getBootstrapServers(
+            rabbitMqContainer.getHost(), rabbitMqContainer.getMappedPort(5672).toString()));
+  }
+
+  private static String getBootstrapServers(String host, String port) {
+    return String.format("amqp://guest:guest@%s:%s", host, port);
+  }
+
+  /** Pipeline options specific for this test. */
+  public interface Options extends IOTestPipelineOptions, StreamingOptions {
+
+    @Description("Options for synthetic source.")
+    @Validation.Required
+    @Default.String("{\"numRecords\": \"500\",\"keySizeBytes\": \"1\",\"valueSizeBytes\": \"90\"}")
+    String getSourceOptions();
+
+    void setSourceOptions(String sourceOptions);
+
+    @Description("RabbitMQ bootstrap server address")
+    @Default.String("amqp://guest:guest@localhost:5672")
+    String getRabbitMqBootstrapServerAddress();
+
+    void setRabbitMqBootstrapServerAddress(String address);
+
+    @Description("RabbitMQ stream")
+    @Default.String("rabbitMqTestStream")
+    String getStreamName();
+
+    void setStreamName(String streamName);
+
+    @Description("Whether to use testcontainers")
+    @Default.Boolean(true)
+    Boolean isWithTestcontainers();
+
+    void setWithTestcontainers(Boolean withTestcontainers);
+
+    @Description("RabbitMQ container version. Use when useTestcontainers is true")
+    @Nullable
+    @Default.String("3.9-alpine")
+    String getRabbitMqContainerVersion();
+
+    void setRabbitMqContainerVersion(String rabbitMqContainerVersion);
+
+    @Description("Time to wait for the events to be processed by the read pipeline (in seconds)")
+    @Default.Integer(50)
+    @Validation.Required
+    Integer getReadTimeout();
+
+    void setReadTimeout(Integer readTimeout);
+  }
+
+  private void writeToRabbitMq(final List<String> messages)
+      throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException,
+          TimeoutException {
+
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+    connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+    Map<String, Object> arguments = new HashMap<>();
+    arguments.put("x-queue-type", "stream");
+
+    try (Connection connection = connectionFactory.newConnection();
+        Channel channel = connection.createChannel()) {
+      channel.queueDeclare(options.getStreamName(), true, false, false, arguments);
+
+      messages.forEach(
+          message -> {
+            try {
+              channel.basicPublish(
+                  "",
+                  options.getStreamName(),
+                  MessageProperties.PERSISTENT_TEXT_PLAIN,
+                  message.getBytes(StandardCharsets.UTF_8));
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+  }
+
+  private SparkReceiverIO.Read<String> readFromRabbitMqWithOffset() {
+    final ReceiverBuilder<String, RabbitMqReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(RabbitMqReceiverWithOffset.class)
+            .withConstructorArgs(
+                options.getRabbitMqBootstrapServerAddress(),
+                options.getStreamName(),
+                sourceOptions.numRecords);
+
+    return SparkReceiverIO.<String>read()
+        .withGetOffsetFn(
+            rabbitMqMessage ->
+                Long.valueOf(rabbitMqMessage.substring(TEST_MESSAGE_PREFIX.length())))
+        .withSparkReceiverBuilder(receiverBuilder);
+  }
+
+  /**
+   * Since streams in RabbitMQ are durable by definition, we have to clean them up after test
+   * execution. The simplest way is to delete the whole stream after test execution.
+   */
+  private static void clearRabbitMQ() {
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+
+    try {
+      connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+      try (Connection connection = connectionFactory.newConnection();
+          Channel channel = connection.createChannel()) {
+        channel.queueDelete(options.getStreamName());
+      }
+    } catch (URISyntaxException
+        | NoSuchAlgorithmException
+        | KeyManagementException
+        | IOException
+        | TimeoutException e) {
+      LOG.error("Error during RabbitMQ clean up", e);
+    }
+  }
+
+  /** Function for counting processed pipeline elements. */
+  private static class CountingFn extends DoFn<String, Void> {
+
+    private final Counter elementCounter;
+
+    CountingFn(String namespace, String name) {
+      elementCounter = Metrics.counter(namespace, name);
+    }
+
+    @ProcessElement
+    public void processElement() {
+      elementCounter.inc(1L);
+    }
+  }
+
+  private void cancelIfTimeout(PipelineResult readResult, PipelineResult.State readState)
+      throws IOException {
+    if (readState == null) {
+      readResult.cancel();
+    }
+  }
+
+  private long readElementMetric(PipelineResult result) {
+    MetricsReader metricsReader = new MetricsReader(result, SparkReceiverIOIT.NAMESPACE);
+    return metricsReader.getCounterMetric(SparkReceiverIOIT.READ_ELEMENT_METRIC_NAME);
+  }
+
+  private Set<NamedTestResult> readMetrics(PipelineResult readResult) {
+    BiFunction<MetricsReader, String, NamedTestResult> supplier =
+        (reader, metricName) -> {
+          long start = reader.getStartTimeMetric(metricName);
+          long end = reader.getEndTimeMetric(metricName);
+          return NamedTestResult.create(TEST_ID, TIMESTAMP, metricName, (end - start) / 1e3);
+        };
+
+    NamedTestResult readTime =
+        supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME);
+    NamedTestResult runTime =
+        NamedTestResult.create(TEST_ID, TIMESTAMP, RUN_TIME_METRIC_NAME, readTime.getValue());
+
+    return ImmutableSet.of(readTime, runTime);
+  }
+
+  @Test
+  public void testSparkReceiverIOReadsInStreamingWithOffset() throws IOException {
+
+    final List<String> messages =
+        LongStream.range(0, sourceOptions.numRecords)
+            .mapToObj(number -> TEST_MESSAGE_PREFIX + number)
+            .collect(Collectors.toList());
+
+    try {
+      writeToRabbitMq(messages);
+    } catch (Exception e) {
+      LOG.error("Can not write to rabbit {}", e.getMessage());
+    }
+    LOG.info(sourceOptions.numRecords + " records were successfully written to RabbitMQ");
+
+    // Use streaming pipeline to read RabbitMQ records.
+    readPipeline.getOptions().as(Options.class).setStreaming(true);
+    readPipeline
+        .apply("Read from unbounded RabbitMq", readFromRabbitMqWithOffset())
+        .setCoder(StringUtf8Coder.of())
+        .apply(ParDo.of(new TestOutputDoFn()))
+        .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
+        .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
+
+    TestOutputDoFn.EXPECTED_RECORDS.addAll(messages);
+
+    final PipelineResult readResult = readPipeline.run();
+    final PipelineResult.State readState =
+        readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    cancelIfTimeout(readResult, readState);
+
+    assertEquals(sourceOptions.numRecords, readElementMetric(readResult));
+
+    if (!options.isWithTestcontainers()) {
+      Set<NamedTestResult> metrics = readMetrics(readResult);
+      IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings);
+    }
+  }
+
+  /** {@link DoFn} that throws {@code RuntimeException} if receives unexpected element. */
+  private static class TestOutputDoFn extends DoFn<String, String> {
+    private static final Set<String> EXPECTED_RECORDS = new HashSet<>();
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<String> outputReceiver) {
+      if (!EXPECTED_RECORDS.contains(element)) {

Review Comment:
   Will it receive always `false` since `EXPECTED_RECORDS ` is always empty?



##########
sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.common.IOITHelper;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
+import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * IO Integration test for {@link org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO}.
+ *
+ * <p>{@see https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests} for
+ * more details.
+ *
+ * <p>NOTE: This test sets retention policy of the messages so that all messages are retained in the
+ * topic so that we could read them back after writing.
+ */
+@RunWith(JUnit4.class)
+public class SparkReceiverIOIT {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIOIT.class);
+
+  private static final String READ_TIME_METRIC_NAME = "read_time";
+
+  private static final String RUN_TIME_METRIC_NAME = "run_time";
+
+  private static final String READ_ELEMENT_METRIC_NAME = "spark_read_element_count";
+
+  private static final String NAMESPACE = SparkReceiverIOIT.class.getName();
+
+  private static final String TEST_ID = UUID.randomUUID().toString();
+
+  private static final String TIMESTAMP = Timestamp.now().toString();
+
+  private static final String TEST_MESSAGE_PREFIX = "Test ";
+
+  private static Options options;
+
+  private static SyntheticSourceOptions sourceOptions;
+
+  private static GenericContainer<?> rabbitMqContainer;
+
+  private static InfluxDBSettings settings;
+
+  private static final ExperimentalOptions sdfPipelineOptions;
+
+  static {
+    sdfPipelineOptions = PipelineOptionsFactory.create().as(ExperimentalOptions.class);
+    sdfPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
+  }
+
+  @Rule public TestPipeline readPipeline = TestPipeline.fromOptions(sdfPipelineOptions);
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    options = IOITHelper.readIOTestPipelineOptions(Options.class);
+    sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
+    options.setRabbitMqBootstrapServerAddress(
+        "amqp://guest:guest@" + options.getRabbitMqBootstrapServerAddress());
+    if (options.isWithTestcontainers()) {
+      setupRabbitMqContainer();
+    } else {
+      settings =
+          InfluxDBSettings.builder()
+              .withHost(options.getInfluxHost())
+              .withDatabase(options.getInfluxDatabase())
+              .withMeasurement(options.getInfluxMeasurement())
+              .get();
+    }
+    clearRabbitMQ();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (rabbitMqContainer != null) {
+      rabbitMqContainer.stop();
+    }
+
+    clearRabbitMQ();
+  }
+
+  private static void setupRabbitMqContainer() {
+    rabbitMqContainer =
+        new RabbitMQContainer(
+                DockerImageName.parse("rabbitmq").withTag(options.getRabbitMqContainerVersion()))
+            .withExposedPorts(5672, 15672);
+    rabbitMqContainer.start();
+    options.setRabbitMqBootstrapServerAddress(
+        getBootstrapServers(
+            rabbitMqContainer.getHost(), rabbitMqContainer.getMappedPort(5672).toString()));
+  }
+
+  private static String getBootstrapServers(String host, String port) {
+    return String.format("amqp://guest:guest@%s:%s", host, port);
+  }
+
+  /** Pipeline options specific for this test. */
+  public interface Options extends IOTestPipelineOptions, StreamingOptions {
+
+    @Description("Options for synthetic source.")
+    @Validation.Required
+    @Default.String("{\"numRecords\": \"500\",\"keySizeBytes\": \"1\",\"valueSizeBytes\": \"90\"}")
+    String getSourceOptions();
+
+    void setSourceOptions(String sourceOptions);
+
+    @Description("RabbitMQ bootstrap server address")
+    @Default.String("amqp://guest:guest@localhost:5672")
+    String getRabbitMqBootstrapServerAddress();
+
+    void setRabbitMqBootstrapServerAddress(String address);
+
+    @Description("RabbitMQ stream")
+    @Default.String("rabbitMqTestStream")
+    String getStreamName();
+
+    void setStreamName(String streamName);
+
+    @Description("Whether to use testcontainers")
+    @Default.Boolean(true)
+    Boolean isWithTestcontainers();
+
+    void setWithTestcontainers(Boolean withTestcontainers);
+
+    @Description("RabbitMQ container version. Use when useTestcontainers is true")
+    @Nullable
+    @Default.String("3.9-alpine")
+    String getRabbitMqContainerVersion();
+
+    void setRabbitMqContainerVersion(String rabbitMqContainerVersion);
+
+    @Description("Time to wait for the events to be processed by the read pipeline (in seconds)")
+    @Default.Integer(50)
+    @Validation.Required
+    Integer getReadTimeout();
+
+    void setReadTimeout(Integer readTimeout);
+  }
+
+  private void writeToRabbitMq(final List<String> messages)
+      throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException,
+          TimeoutException {
+
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+    connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+    Map<String, Object> arguments = new HashMap<>();
+    arguments.put("x-queue-type", "stream");
+
+    try (Connection connection = connectionFactory.newConnection();
+        Channel channel = connection.createChannel()) {
+      channel.queueDeclare(options.getStreamName(), true, false, false, arguments);
+
+      messages.forEach(
+          message -> {
+            try {
+              channel.basicPublish(
+                  "",
+                  options.getStreamName(),
+                  MessageProperties.PERSISTENT_TEXT_PLAIN,
+                  message.getBytes(StandardCharsets.UTF_8));
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+  }
+
+  private SparkReceiverIO.Read<String> readFromRabbitMqWithOffset() {
+    final ReceiverBuilder<String, RabbitMqReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(RabbitMqReceiverWithOffset.class)
+            .withConstructorArgs(
+                options.getRabbitMqBootstrapServerAddress(),
+                options.getStreamName(),
+                sourceOptions.numRecords);
+
+    return SparkReceiverIO.<String>read()
+        .withGetOffsetFn(
+            rabbitMqMessage ->
+                Long.valueOf(rabbitMqMessage.substring(TEST_MESSAGE_PREFIX.length())))
+        .withSparkReceiverBuilder(receiverBuilder);
+  }
+
+  /**
+   * Since streams in RabbitMQ are durable by definition, we have to clean them up after test
+   * execution. The simplest way is to delete the whole stream after test execution.
+   */
+  private static void clearRabbitMQ() {
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+
+    try {
+      connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+      try (Connection connection = connectionFactory.newConnection();
+          Channel channel = connection.createChannel()) {
+        channel.queueDelete(options.getStreamName());
+      }
+    } catch (URISyntaxException
+        | NoSuchAlgorithmException
+        | KeyManagementException
+        | IOException
+        | TimeoutException e) {
+      LOG.error("Error during RabbitMQ clean up", e);
+    }
+  }
+
+  /** Function for counting processed pipeline elements. */
+  private static class CountingFn extends DoFn<String, Void> {
+
+    private final Counter elementCounter;
+
+    CountingFn(String namespace, String name) {
+      elementCounter = Metrics.counter(namespace, name);
+    }
+
+    @ProcessElement
+    public void processElement() {
+      elementCounter.inc(1L);
+    }
+  }
+
+  private void cancelIfTimeout(PipelineResult readResult, PipelineResult.State readState)
+      throws IOException {
+    if (readState == null) {
+      readResult.cancel();
+    }
+  }
+
+  private long readElementMetric(PipelineResult result) {
+    MetricsReader metricsReader = new MetricsReader(result, SparkReceiverIOIT.NAMESPACE);
+    return metricsReader.getCounterMetric(SparkReceiverIOIT.READ_ELEMENT_METRIC_NAME);
+  }
+
+  private Set<NamedTestResult> readMetrics(PipelineResult readResult) {
+    BiFunction<MetricsReader, String, NamedTestResult> supplier =
+        (reader, metricName) -> {
+          long start = reader.getStartTimeMetric(metricName);
+          long end = reader.getEndTimeMetric(metricName);
+          return NamedTestResult.create(TEST_ID, TIMESTAMP, metricName, (end - start) / 1e3);
+        };
+
+    NamedTestResult readTime =
+        supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME);
+    NamedTestResult runTime =
+        NamedTestResult.create(TEST_ID, TIMESTAMP, RUN_TIME_METRIC_NAME, readTime.getValue());
+
+    return ImmutableSet.of(readTime, runTime);
+  }
+
+  @Test
+  public void testSparkReceiverIOReadsInStreamingWithOffset() throws IOException {
+
+    final List<String> messages =
+        LongStream.range(0, sourceOptions.numRecords)
+            .mapToObj(number -> TEST_MESSAGE_PREFIX + number)
+            .collect(Collectors.toList());
+
+    try {
+      writeToRabbitMq(messages);
+    } catch (Exception e) {
+      LOG.error("Can not write to rabbit {}", e.getMessage());

Review Comment:
   Shouldn't we fail a test here and not continue?



##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -721,6 +721,7 @@ class BeamModulePlugin implements Plugin<Project> {
         testcontainers_mysql                        : "org.testcontainers:mysql:$testcontainers_version",
         testcontainers_gcloud                       : "org.testcontainers:gcloud:$testcontainers_version",
         vendored_grpc_1_48_1                        : "org.apache.beam:beam-vendor-grpc-1_48_1:0.1",
+        testcontainers_rabbitmq                     : "org.testcontainers:rabbitmq:$testcontainers_version",

Review Comment:
   nit: Could you add it one line upper to keep the sorted order of library names?



##########
sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/RabbitMqReceiverWithOffset.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Imitation of Spark {@link Receiver} for RabbitMQ that implements {@link HasOffset} interface.
+ * Used to test {@link SparkReceiverIO#read()}.
+ */
+public class RabbitMqReceiverWithOffset extends Receiver<String> implements HasOffset {

Review Comment:
   Should it be `public`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1293628930

   Run Java_Spark3_Versions PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1281160930

   Run Java SparkReceiverIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1281825523

   Retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1281938487

   Run Java SparkReceiverIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1284267655

   Run Java_Debezium_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1280998155

   Run Java SparkReceiverIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1282501032

   Unfortunately, the job `SparkReceiverIO Performance Test` is still failing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1281938198

   Run Go PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1283840642

   @aromanenko-dev the `.groovy` file has been modified, so the Seed Job must be run first
   Thanks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1256261764

   @Lizzfox Thanks, I'll take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1270199109

   Run Seed Job


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #23305:
URL: https://github.com/apache/beam/pull/23305#discussion_r989223915


##########
sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.common.IOITHelper;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
+import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * IO Integration test for {@link org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO}.
+ *
+ * <p>{@see https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests} for
+ * more details.
+ *
+ * <p>NOTE: This test sets retention policy of the messages so that all messages are retained in the
+ * topic so that we could read them back after writing.
+ */
+@RunWith(JUnit4.class)
+public class SparkReceiverIOIT {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIOIT.class);
+
+  private static final String READ_TIME_METRIC_NAME = "read_time";
+
+  private static final String RUN_TIME_METRIC_NAME = "run_time";
+
+  private static final String READ_ELEMENT_METRIC_NAME = "spark_read_element_count";
+
+  private static final String NAMESPACE = SparkReceiverIOIT.class.getName();
+
+  private static final String TEST_ID = UUID.randomUUID().toString();
+
+  private static final String TIMESTAMP = Timestamp.now().toString();
+
+  private static final String TEST_MESSAGE_PREFIX = "Test ";
+
+  private static Options options;
+
+  private static SyntheticSourceOptions sourceOptions;
+
+  private static GenericContainer<?> rabbitMqContainer;
+
+  private static InfluxDBSettings settings;
+
+  private static final ExperimentalOptions sdfPipelineOptions;
+
+  static {
+    sdfPipelineOptions = PipelineOptionsFactory.create().as(ExperimentalOptions.class);
+    sdfPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
+  }
+
+  @Rule public TestPipeline readPipeline = TestPipeline.fromOptions(sdfPipelineOptions);
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    options = IOITHelper.readIOTestPipelineOptions(Options.class);
+    sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
+    options.setRabbitMqBootstrapServerAddress(
+        "amqp://guest:guest@" + options.getRabbitMqBootstrapServerAddress());
+    if (options.isWithTestcontainers()) {
+      setupRabbitMqContainer();
+    } else {
+      settings =
+          InfluxDBSettings.builder()
+              .withHost(options.getInfluxHost())
+              .withDatabase(options.getInfluxDatabase())
+              .withMeasurement(options.getInfluxMeasurement())
+              .get();
+    }
+    clearRabbitMQ();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (rabbitMqContainer != null) {
+      rabbitMqContainer.stop();
+    }
+
+    clearRabbitMQ();
+  }
+
+  private static void setupRabbitMqContainer() {
+    rabbitMqContainer =
+        new RabbitMQContainer(
+                DockerImageName.parse("rabbitmq").withTag(options.getRabbitMqContainerVersion()))
+            .withExposedPorts(5672, 15672);
+    rabbitMqContainer.start();
+    options.setRabbitMqBootstrapServerAddress(
+        getBootstrapServers(
+            rabbitMqContainer.getHost(), rabbitMqContainer.getMappedPort(5672).toString()));
+  }
+
+  private static String getBootstrapServers(String host, String port) {
+    return String.format("amqp://guest:guest@%s:%s", host, port);
+  }
+
+  /** Pipeline options specific for this test. */
+  public interface Options extends IOTestPipelineOptions, StreamingOptions {
+
+    @Description("Options for synthetic source.")
+    @Validation.Required
+    @Default.String("{\"numRecords\": \"500\",\"keySizeBytes\": \"1\",\"valueSizeBytes\": \"90\"}")
+    String getSourceOptions();
+
+    void setSourceOptions(String sourceOptions);
+
+    @Description("RabbitMQ bootstrap server address")
+    @Default.String("amqp://guest:guest@localhost:5672")
+    String getRabbitMqBootstrapServerAddress();
+
+    void setRabbitMqBootstrapServerAddress(String address);
+
+    @Description("RabbitMQ stream")
+    @Default.String("rabbitMqTestStream")
+    String getStreamName();
+
+    void setStreamName(String streamName);
+
+    @Description("Whether to use testcontainers")
+    @Default.Boolean(true)
+    Boolean isWithTestcontainers();
+
+    void setWithTestcontainers(Boolean withTestcontainers);
+
+    @Description("RabbitMQ container version. Use when useTestcontainers is true")
+    @Nullable
+    @Default.String("3.9-alpine")
+    String getRabbitMqContainerVersion();
+
+    void setRabbitMqContainerVersion(String rabbitMqContainerVersion);
+
+    @Description("Time to wait for the events to be processed by the read pipeline (in seconds)")
+    @Default.Integer(50)
+    @Validation.Required
+    Integer getReadTimeout();
+
+    void setReadTimeout(Integer readTimeout);
+  }
+
+  private void writeToRabbitMq(final List<String> messages)
+      throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException,
+          TimeoutException {
+
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+    connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+    Map<String, Object> arguments = new HashMap<>();
+    arguments.put("x-queue-type", "stream");
+
+    try (Connection connection = connectionFactory.newConnection();
+        Channel channel = connection.createChannel()) {
+      channel.queueDeclare(options.getStreamName(), true, false, false, arguments);
+
+      messages.forEach(
+          message -> {
+            try {
+              channel.basicPublish(
+                  "",
+                  options.getStreamName(),
+                  MessageProperties.PERSISTENT_TEXT_PLAIN,
+                  message.getBytes(StandardCharsets.UTF_8));
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+  }
+
+  private SparkReceiverIO.Read<String> readFromRabbitMqWithOffset() {
+    final ReceiverBuilder<String, RabbitMqReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(RabbitMqReceiverWithOffset.class)
+            .withConstructorArgs(
+                options.getRabbitMqBootstrapServerAddress(),
+                options.getStreamName(),
+                sourceOptions.numRecords);
+
+    return SparkReceiverIO.<String>read()
+        .withGetOffsetFn(
+            rabbitMqMessage ->
+                Long.valueOf(rabbitMqMessage.substring(TEST_MESSAGE_PREFIX.length())))
+        .withSparkReceiverBuilder(receiverBuilder);
+  }
+
+  /**
+   * Since streams in RabbitMQ are durable by definition, we have to clean them up after test
+   * execution. The simplest way is to delete the whole stream after test execution.
+   */
+  private static void clearRabbitMQ() {
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+
+    try {
+      connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+      try (Connection connection = connectionFactory.newConnection();
+          Channel channel = connection.createChannel()) {
+        channel.queueDelete(options.getStreamName());
+      }
+    } catch (URISyntaxException
+        | NoSuchAlgorithmException
+        | KeyManagementException
+        | IOException
+        | TimeoutException e) {
+      LOG.error("Error during RabbitMQ clean up", e);
+    }
+  }
+
+  /** Function for counting processed pipeline elements. */
+  private static class CountingFn extends DoFn<String, Void> {
+
+    private final Counter elementCounter;
+
+    CountingFn(String namespace, String name) {
+      elementCounter = Metrics.counter(namespace, name);
+    }
+
+    @ProcessElement
+    public void processElement() {
+      elementCounter.inc(1L);
+    }
+  }
+
+  private void cancelIfTimeout(PipelineResult readResult, PipelineResult.State readState)
+      throws IOException {
+    if (readState == null) {
+      readResult.cancel();
+    }
+  }
+
+  private long readElementMetric(PipelineResult result) {
+    MetricsReader metricsReader = new MetricsReader(result, SparkReceiverIOIT.NAMESPACE);
+    return metricsReader.getCounterMetric(SparkReceiverIOIT.READ_ELEMENT_METRIC_NAME);
+  }
+
+  private Set<NamedTestResult> readMetrics(PipelineResult readResult) {
+    BiFunction<MetricsReader, String, NamedTestResult> supplier =
+        (reader, metricName) -> {
+          long start = reader.getStartTimeMetric(metricName);
+          long end = reader.getEndTimeMetric(metricName);
+          return NamedTestResult.create(TEST_ID, TIMESTAMP, metricName, (end - start) / 1e3);
+        };
+
+    NamedTestResult readTime =
+        supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME);
+    NamedTestResult runTime =
+        NamedTestResult.create(TEST_ID, TIMESTAMP, RUN_TIME_METRIC_NAME, readTime.getValue());
+
+    return ImmutableSet.of(readTime, runTime);
+  }
+
+  @Test
+  public void testSparkReceiverIOReadsInStreamingWithOffset() throws IOException {
+
+    final List<String> messages =
+        LongStream.range(0, sourceOptions.numRecords)
+            .mapToObj(number -> TEST_MESSAGE_PREFIX + number)
+            .collect(Collectors.toList());
+
+    try {
+      writeToRabbitMq(messages);
+    } catch (Exception e) {
+      LOG.error("Can not write to rabbit {}", e.getMessage());
+    }
+    LOG.info(sourceOptions.numRecords + " records were successfully written to RabbitMQ");
+
+    // Use streaming pipeline to read RabbitMQ records.
+    readPipeline.getOptions().as(Options.class).setStreaming(true);
+    readPipeline
+        .apply("Read from unbounded RabbitMq", readFromRabbitMqWithOffset())
+        .setCoder(StringUtf8Coder.of())
+        .apply(ParDo.of(new TestOutputDoFn()))
+        .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
+        .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
+
+    TestOutputDoFn.EXPECTED_RECORDS.addAll(messages);
+
+    final PipelineResult readResult = readPipeline.run();
+    final PipelineResult.State readState =
+        readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    cancelIfTimeout(readResult, readState);
+
+    assertEquals(sourceOptions.numRecords, readElementMetric(readResult));
+
+    if (!options.isWithTestcontainers()) {
+      Set<NamedTestResult> metrics = readMetrics(readResult);
+      IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings);
+    }
+  }
+
+  /** {@link DoFn} that throws {@code RuntimeException} if receives unexpected element. */
+  private static class TestOutputDoFn extends DoFn<String, String> {
+    private static final Set<String> EXPECTED_RECORDS = new HashSet<>();
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<String> outputReceiver) {
+      if (!EXPECTED_RECORDS.contains(element)) {

Review Comment:
   Will it always return `false` since `EXPECTED_RECORDS ` is always empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1271234021

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #23305:
URL: https://github.com/apache/beam/pull/23305#discussion_r989689183


##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -721,6 +721,7 @@ class BeamModulePlugin implements Plugin<Project> {
         testcontainers_mysql                        : "org.testcontainers:mysql:$testcontainers_version",
         testcontainers_gcloud                       : "org.testcontainers:gcloud:$testcontainers_version",
         vendored_grpc_1_48_1                        : "org.apache.beam:beam-vendor-grpc-1_48_1:0.1",
+        testcontainers_rabbitmq                     : "org.testcontainers:rabbitmq:$testcontainers_version",

Review Comment:
   Done



##########
.test-infra/jenkins/job_PerformanceTests_SparkReceiverIO_IT.groovy:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as common
+import Kubernetes
+import InfluxDBCredentialsHelper
+
+String jobName = "beam_PerformanceTests_SparkReceiver_IO"
+
+/**
+ * This job runs the SparkReceiver IO performance tests.
+ It runs on a RabbitMQ cluster that is build by applying the folder .test-infra/kubernetes/rabbit,
+ in an existing kubernetes cluster (DEFAULT_CLUSTER in Kubernetes.groovy).
+ The services created to run this test are:
+ Pods: 1 RabbitMq pods.
+ Services: 1 broker
+ When the performance tests finish all resources are cleaned up by a postBuild step in Kubernetes.groovy
+ */
+job(jobName) {
+  common.setTopLevelMainJobProperties(delegate, 'master', 120)
+  common.setAutoJob(delegate, 'H H/6 * * *')
+  common.enablePhraseTriggeringFromPullRequest(
+      delegate,
+      'Java SparkReceiverIO Performance Test',
+      'Run Java SparkReceiverIO Performance Test')
+  InfluxDBCredentialsHelper.useCredentials(delegate)
+
+  String namespace = common.getKubernetesNamespace(jobName)
+  String kubeconfig = common.getKubeconfigLocationForNamespace(namespace)
+  Kubernetes k8s = Kubernetes.create(delegate, kubeconfig, namespace)
+
+  k8s.apply(common.makePathAbsolute("src/.test-infra/kubernetes/rabbit/rabbitmq.yaml"))
+  String rabbitMqHostName = "LOAD_BALANCER_IP"
+  k8s.loadBalancerIP("rabbit", rabbitMqHostName)
+
+  Map pipelineOptions = [
+    tempRoot                      : 'gs://temp-storage-for-perf-tests',
+    project                       : 'apache-beam-testing',
+    runner                        : 'DataflowRunner',
+    sourceOptions                 : """
+                                     {
+                                       "numRecords": "600000",
+                                       "keySizeBytes": "1",
+                                       "valueSizeBytes": "90"
+                                     }
+                                   """.trim().replaceAll("\\s", ""),
+    bigQueryDataset               : 'beam_performance',
+    bigQueryTable                 : 'sparkreceiverioit_results',
+    influxMeasurement             : 'sparkreceiverioit_results',
+    influxDatabase                : InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+    influxHost                    : InfluxDBCredentialsHelper.InfluxDBHostUrl,
+    rabbitMqBootstrapServerAddress: 'amqp://guest:guest@' + rabbitMqHostName + ':5672',
+    streamName                    : 'rabbitMqTestStream',
+    readTimeout                   : '900',
+    numWorkers                    : '5',
+    autoscalingAlgorithm          : 'NONE'
+  ]
+
+  steps {
+    gradle {
+      rootBuildScriptDir(common.checkoutDir)
+      common.setGradleSwitches(delegate)
+      switches("--info")
+      switches("-DintegrationTestPipelineOptions=\'${common.joinPipelineOptions(pipelineOptions)}\'")
+      switches("-DintegrationTestRunner=dataflow")
+      tasks(":sdks:java:io:jdbc:integrationTest --tests org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIOIT.testSparkReceiverIOReadsInStreamingWithOffset\"")

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1282208622

   Run Java_Spark3_Versions PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1283899841

   Run Java SparkReceiverIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1281937749

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1285153593

   Run Java SparkReceiverIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1293657428

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1281824553

   Run Java SparkReceiverIO Performance Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1252495650

   # [Codecov](https://codecov.io/gh/apache/beam/pull/23305?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#23305](https://codecov.io/gh/apache/beam/pull/23305?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d1ba717) into [master](https://codecov.io/gh/apache/beam/commit/d578e3df7c963e57f251fb27739fbc1d3811e722?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d578e3d) will **increase** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@           Coverage Diff           @@
   ##           master   #23305   +/-   ##
   =======================================
     Coverage   73.41%   73.41%           
   =======================================
     Files         718      718           
     Lines       95620    95620           
   =======================================
   + Hits        70201    70202    +1     
   + Misses      24108    24107    -1     
     Partials     1311     1311           
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.18% <ø> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/23305?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/23305/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.09% <0.00%> (+0.15%)` | :arrow_up: |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on a diff in pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on code in PR #23305:
URL: https://github.com/apache/beam/pull/23305#discussion_r989223915


##########
sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.common.IOITHelper;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
+import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * IO Integration test for {@link org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO}.
+ *
+ * <p>{@see https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests} for
+ * more details.
+ *
+ * <p>NOTE: This test sets retention policy of the messages so that all messages are retained in the
+ * topic so that we could read them back after writing.
+ */
+@RunWith(JUnit4.class)
+public class SparkReceiverIOIT {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIOIT.class);
+
+  private static final String READ_TIME_METRIC_NAME = "read_time";
+
+  private static final String RUN_TIME_METRIC_NAME = "run_time";
+
+  private static final String READ_ELEMENT_METRIC_NAME = "spark_read_element_count";
+
+  private static final String NAMESPACE = SparkReceiverIOIT.class.getName();
+
+  private static final String TEST_ID = UUID.randomUUID().toString();
+
+  private static final String TIMESTAMP = Timestamp.now().toString();
+
+  private static final String TEST_MESSAGE_PREFIX = "Test ";
+
+  private static Options options;
+
+  private static SyntheticSourceOptions sourceOptions;
+
+  private static GenericContainer<?> rabbitMqContainer;
+
+  private static InfluxDBSettings settings;
+
+  private static final ExperimentalOptions sdfPipelineOptions;
+
+  static {
+    sdfPipelineOptions = PipelineOptionsFactory.create().as(ExperimentalOptions.class);
+    sdfPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
+  }
+
+  @Rule public TestPipeline readPipeline = TestPipeline.fromOptions(sdfPipelineOptions);
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    options = IOITHelper.readIOTestPipelineOptions(Options.class);
+    sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
+    options.setRabbitMqBootstrapServerAddress(
+        "amqp://guest:guest@" + options.getRabbitMqBootstrapServerAddress());
+    if (options.isWithTestcontainers()) {
+      setupRabbitMqContainer();
+    } else {
+      settings =
+          InfluxDBSettings.builder()
+              .withHost(options.getInfluxHost())
+              .withDatabase(options.getInfluxDatabase())
+              .withMeasurement(options.getInfluxMeasurement())
+              .get();
+    }
+    clearRabbitMQ();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (rabbitMqContainer != null) {
+      rabbitMqContainer.stop();
+    }
+
+    clearRabbitMQ();
+  }
+
+  private static void setupRabbitMqContainer() {
+    rabbitMqContainer =
+        new RabbitMQContainer(
+                DockerImageName.parse("rabbitmq").withTag(options.getRabbitMqContainerVersion()))
+            .withExposedPorts(5672, 15672);
+    rabbitMqContainer.start();
+    options.setRabbitMqBootstrapServerAddress(
+        getBootstrapServers(
+            rabbitMqContainer.getHost(), rabbitMqContainer.getMappedPort(5672).toString()));
+  }
+
+  private static String getBootstrapServers(String host, String port) {
+    return String.format("amqp://guest:guest@%s:%s", host, port);
+  }
+
+  /** Pipeline options specific for this test. */
+  public interface Options extends IOTestPipelineOptions, StreamingOptions {
+
+    @Description("Options for synthetic source.")
+    @Validation.Required
+    @Default.String("{\"numRecords\": \"500\",\"keySizeBytes\": \"1\",\"valueSizeBytes\": \"90\"}")
+    String getSourceOptions();
+
+    void setSourceOptions(String sourceOptions);
+
+    @Description("RabbitMQ bootstrap server address")
+    @Default.String("amqp://guest:guest@localhost:5672")
+    String getRabbitMqBootstrapServerAddress();
+
+    void setRabbitMqBootstrapServerAddress(String address);
+
+    @Description("RabbitMQ stream")
+    @Default.String("rabbitMqTestStream")
+    String getStreamName();
+
+    void setStreamName(String streamName);
+
+    @Description("Whether to use testcontainers")
+    @Default.Boolean(true)
+    Boolean isWithTestcontainers();
+
+    void setWithTestcontainers(Boolean withTestcontainers);
+
+    @Description("RabbitMQ container version. Use when useTestcontainers is true")
+    @Nullable
+    @Default.String("3.9-alpine")
+    String getRabbitMqContainerVersion();
+
+    void setRabbitMqContainerVersion(String rabbitMqContainerVersion);
+
+    @Description("Time to wait for the events to be processed by the read pipeline (in seconds)")
+    @Default.Integer(50)
+    @Validation.Required
+    Integer getReadTimeout();
+
+    void setReadTimeout(Integer readTimeout);
+  }
+
+  private void writeToRabbitMq(final List<String> messages)
+      throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException,
+          TimeoutException {
+
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+    connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+    Map<String, Object> arguments = new HashMap<>();
+    arguments.put("x-queue-type", "stream");
+
+    try (Connection connection = connectionFactory.newConnection();
+        Channel channel = connection.createChannel()) {
+      channel.queueDeclare(options.getStreamName(), true, false, false, arguments);
+
+      messages.forEach(
+          message -> {
+            try {
+              channel.basicPublish(
+                  "",
+                  options.getStreamName(),
+                  MessageProperties.PERSISTENT_TEXT_PLAIN,
+                  message.getBytes(StandardCharsets.UTF_8));
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+  }
+
+  private SparkReceiverIO.Read<String> readFromRabbitMqWithOffset() {
+    final ReceiverBuilder<String, RabbitMqReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(RabbitMqReceiverWithOffset.class)
+            .withConstructorArgs(
+                options.getRabbitMqBootstrapServerAddress(),
+                options.getStreamName(),
+                sourceOptions.numRecords);
+
+    return SparkReceiverIO.<String>read()
+        .withGetOffsetFn(
+            rabbitMqMessage ->
+                Long.valueOf(rabbitMqMessage.substring(TEST_MESSAGE_PREFIX.length())))
+        .withSparkReceiverBuilder(receiverBuilder);
+  }
+
+  /**
+   * Since streams in RabbitMQ are durable by definition, we have to clean them up after test
+   * execution. The simplest way is to delete the whole stream after test execution.
+   */
+  private static void clearRabbitMQ() {
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+
+    try {
+      connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+      try (Connection connection = connectionFactory.newConnection();
+          Channel channel = connection.createChannel()) {
+        channel.queueDelete(options.getStreamName());
+      }
+    } catch (URISyntaxException
+        | NoSuchAlgorithmException
+        | KeyManagementException
+        | IOException
+        | TimeoutException e) {
+      LOG.error("Error during RabbitMQ clean up", e);
+    }
+  }
+
+  /** Function for counting processed pipeline elements. */
+  private static class CountingFn extends DoFn<String, Void> {
+
+    private final Counter elementCounter;
+
+    CountingFn(String namespace, String name) {
+      elementCounter = Metrics.counter(namespace, name);
+    }
+
+    @ProcessElement
+    public void processElement() {
+      elementCounter.inc(1L);
+    }
+  }
+
+  private void cancelIfTimeout(PipelineResult readResult, PipelineResult.State readState)
+      throws IOException {
+    if (readState == null) {
+      readResult.cancel();
+    }
+  }
+
+  private long readElementMetric(PipelineResult result) {
+    MetricsReader metricsReader = new MetricsReader(result, SparkReceiverIOIT.NAMESPACE);
+    return metricsReader.getCounterMetric(SparkReceiverIOIT.READ_ELEMENT_METRIC_NAME);
+  }
+
+  private Set<NamedTestResult> readMetrics(PipelineResult readResult) {
+    BiFunction<MetricsReader, String, NamedTestResult> supplier =
+        (reader, metricName) -> {
+          long start = reader.getStartTimeMetric(metricName);
+          long end = reader.getEndTimeMetric(metricName);
+          return NamedTestResult.create(TEST_ID, TIMESTAMP, metricName, (end - start) / 1e3);
+        };
+
+    NamedTestResult readTime =
+        supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME);
+    NamedTestResult runTime =
+        NamedTestResult.create(TEST_ID, TIMESTAMP, RUN_TIME_METRIC_NAME, readTime.getValue());
+
+    return ImmutableSet.of(readTime, runTime);
+  }
+
+  @Test
+  public void testSparkReceiverIOReadsInStreamingWithOffset() throws IOException {
+
+    final List<String> messages =
+        LongStream.range(0, sourceOptions.numRecords)
+            .mapToObj(number -> TEST_MESSAGE_PREFIX + number)
+            .collect(Collectors.toList());
+
+    try {
+      writeToRabbitMq(messages);
+    } catch (Exception e) {
+      LOG.error("Can not write to rabbit {}", e.getMessage());
+    }
+    LOG.info(sourceOptions.numRecords + " records were successfully written to RabbitMQ");
+
+    // Use streaming pipeline to read RabbitMQ records.
+    readPipeline.getOptions().as(Options.class).setStreaming(true);
+    readPipeline
+        .apply("Read from unbounded RabbitMq", readFromRabbitMqWithOffset())
+        .setCoder(StringUtf8Coder.of())
+        .apply(ParDo.of(new TestOutputDoFn()))
+        .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
+        .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
+
+    TestOutputDoFn.EXPECTED_RECORDS.addAll(messages);
+
+    final PipelineResult readResult = readPipeline.run();
+    final PipelineResult.State readState =
+        readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
+
+    cancelIfTimeout(readResult, readState);
+
+    assertEquals(sourceOptions.numRecords, readElementMetric(readResult));
+
+    if (!options.isWithTestcontainers()) {
+      Set<NamedTestResult> metrics = readMetrics(readResult);
+      IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings);
+    }
+  }
+
+  /** {@link DoFn} that throws {@code RuntimeException} if receives unexpected element. */
+  private static class TestOutputDoFn extends DoFn<String, String> {
+    private static final Set<String> EXPECTED_RECORDS = new HashSet<>();
+
+    @ProcessElement
+    public void processElement(@Element String element, OutputReceiver<String> outputReceiver) {
+      if (!EXPECTED_RECORDS.contains(element)) {

Review Comment:
   Will it return always `false` since `EXPECTED_RECORDS ` is always empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1293453682

   Run Go PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Lizzfox commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Lizzfox commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1293681799

   Run Java_Neo4j_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev merged pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev merged PR #23305:
URL: https://github.com/apache/beam/pull/23305


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amar3tto commented on a diff in pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
Amar3tto commented on code in PR #23305:
URL: https://github.com/apache/beam/pull/23305#discussion_r995390258


##########
sdks/java/io/sparkreceiver/src/test/java/org/apache/beam/sdk/io/sparkreceiver/SparkReceiverIOIT.java:
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.sparkreceiver;
+
+import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.MessageProperties;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.common.IOITHelper;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testutils.NamedTestResult;
+import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
+import org.apache.beam.sdk.testutils.metrics.MetricsReader;
+import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
+import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * IO Integration test for {@link org.apache.beam.sdk.io.sparkreceiver.SparkReceiverIO}.
+ *
+ * <p>{@see https://beam.apache.org/documentation/io/testing/#i-o-transform-integration-tests} for
+ * more details.
+ *
+ * <p>NOTE: This test sets retention policy of the messages so that all messages are retained in the
+ * topic so that we could read them back after writing.
+ */
+@RunWith(JUnit4.class)
+public class SparkReceiverIOIT {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkReceiverIOIT.class);
+
+  private static final String READ_TIME_METRIC_NAME = "read_time";
+
+  private static final String RUN_TIME_METRIC_NAME = "run_time";
+
+  private static final String READ_ELEMENT_METRIC_NAME = "spark_read_element_count";
+
+  private static final String NAMESPACE = SparkReceiverIOIT.class.getName();
+
+  private static final String TEST_ID = UUID.randomUUID().toString();
+
+  private static final String TIMESTAMP = Timestamp.now().toString();
+
+  private static final String TEST_MESSAGE_PREFIX = "Test ";
+
+  private static Options options;
+
+  private static SyntheticSourceOptions sourceOptions;
+
+  private static GenericContainer<?> rabbitMqContainer;
+
+  private static InfluxDBSettings settings;
+
+  private static final ExperimentalOptions sdfPipelineOptions;
+
+  static {
+    sdfPipelineOptions = PipelineOptionsFactory.create().as(ExperimentalOptions.class);
+    sdfPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
+  }
+
+  @Rule public TestPipeline readPipeline = TestPipeline.fromOptions(sdfPipelineOptions);
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    options = IOITHelper.readIOTestPipelineOptions(Options.class);
+    sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
+    options.setRabbitMqBootstrapServerAddress(
+        "amqp://guest:guest@" + options.getRabbitMqBootstrapServerAddress());
+    if (options.isWithTestcontainers()) {
+      setupRabbitMqContainer();
+    } else {
+      settings =
+          InfluxDBSettings.builder()
+              .withHost(options.getInfluxHost())
+              .withDatabase(options.getInfluxDatabase())
+              .withMeasurement(options.getInfluxMeasurement())
+              .get();
+    }
+    clearRabbitMQ();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (rabbitMqContainer != null) {
+      rabbitMqContainer.stop();
+    }
+
+    clearRabbitMQ();
+  }
+
+  private static void setupRabbitMqContainer() {
+    rabbitMqContainer =
+        new RabbitMQContainer(
+                DockerImageName.parse("rabbitmq").withTag(options.getRabbitMqContainerVersion()))
+            .withExposedPorts(5672, 15672);
+    rabbitMqContainer.start();
+    options.setRabbitMqBootstrapServerAddress(
+        getBootstrapServers(
+            rabbitMqContainer.getHost(), rabbitMqContainer.getMappedPort(5672).toString()));
+  }
+
+  private static String getBootstrapServers(String host, String port) {
+    return String.format("amqp://guest:guest@%s:%s", host, port);
+  }
+
+  /** Pipeline options specific for this test. */
+  public interface Options extends IOTestPipelineOptions, StreamingOptions {
+
+    @Description("Options for synthetic source.")
+    @Validation.Required
+    @Default.String("{\"numRecords\": \"500\",\"keySizeBytes\": \"1\",\"valueSizeBytes\": \"90\"}")
+    String getSourceOptions();
+
+    void setSourceOptions(String sourceOptions);
+
+    @Description("RabbitMQ bootstrap server address")
+    @Default.String("amqp://guest:guest@localhost:5672")
+    String getRabbitMqBootstrapServerAddress();
+
+    void setRabbitMqBootstrapServerAddress(String address);
+
+    @Description("RabbitMQ stream")
+    @Default.String("rabbitMqTestStream")
+    String getStreamName();
+
+    void setStreamName(String streamName);
+
+    @Description("Whether to use testcontainers")
+    @Default.Boolean(true)
+    Boolean isWithTestcontainers();
+
+    void setWithTestcontainers(Boolean withTestcontainers);
+
+    @Description("RabbitMQ container version. Use when useTestcontainers is true")
+    @Nullable
+    @Default.String("3.9-alpine")
+    String getRabbitMqContainerVersion();
+
+    void setRabbitMqContainerVersion(String rabbitMqContainerVersion);
+
+    @Description("Time to wait for the events to be processed by the read pipeline (in seconds)")
+    @Default.Integer(50)
+    @Validation.Required
+    Integer getReadTimeout();
+
+    void setReadTimeout(Integer readTimeout);
+  }
+
+  private void writeToRabbitMq(final List<String> messages)
+      throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException,
+          TimeoutException {
+
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+    connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+    Map<String, Object> arguments = new HashMap<>();
+    arguments.put("x-queue-type", "stream");
+
+    try (Connection connection = connectionFactory.newConnection();
+        Channel channel = connection.createChannel()) {
+      channel.queueDeclare(options.getStreamName(), true, false, false, arguments);
+
+      messages.forEach(
+          message -> {
+            try {
+              channel.basicPublish(
+                  "",
+                  options.getStreamName(),
+                  MessageProperties.PERSISTENT_TEXT_PLAIN,
+                  message.getBytes(StandardCharsets.UTF_8));
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+  }
+
+  private SparkReceiverIO.Read<String> readFromRabbitMqWithOffset() {
+    final ReceiverBuilder<String, RabbitMqReceiverWithOffset> receiverBuilder =
+        new ReceiverBuilder<>(RabbitMqReceiverWithOffset.class)
+            .withConstructorArgs(
+                options.getRabbitMqBootstrapServerAddress(),
+                options.getStreamName(),
+                sourceOptions.numRecords);
+
+    return SparkReceiverIO.<String>read()
+        .withGetOffsetFn(
+            rabbitMqMessage ->
+                Long.valueOf(rabbitMqMessage.substring(TEST_MESSAGE_PREFIX.length())))
+        .withSparkReceiverBuilder(receiverBuilder);
+  }
+
+  /**
+   * Since streams in RabbitMQ are durable by definition, we have to clean them up after test
+   * execution. The simplest way is to delete the whole stream after test execution.
+   */
+  private static void clearRabbitMQ() {
+    final ConnectionFactory connectionFactory = new ConnectionFactory();
+
+    try {
+      connectionFactory.setUri(options.getRabbitMqBootstrapServerAddress());
+      try (Connection connection = connectionFactory.newConnection();
+          Channel channel = connection.createChannel()) {
+        channel.queueDelete(options.getStreamName());
+      }
+    } catch (URISyntaxException
+        | NoSuchAlgorithmException
+        | KeyManagementException
+        | IOException
+        | TimeoutException e) {
+      LOG.error("Error during RabbitMQ clean up", e);
+    }
+  }
+
+  /** Function for counting processed pipeline elements. */
+  private static class CountingFn extends DoFn<String, Void> {
+
+    private final Counter elementCounter;
+
+    CountingFn(String namespace, String name) {
+      elementCounter = Metrics.counter(namespace, name);
+    }
+
+    @ProcessElement
+    public void processElement() {
+      elementCounter.inc(1L);
+    }
+  }
+
+  private void cancelIfTimeout(PipelineResult readResult, PipelineResult.State readState)
+      throws IOException {
+    if (readState == null) {
+      readResult.cancel();
+    }
+  }
+
+  private long readElementMetric(PipelineResult result) {
+    MetricsReader metricsReader = new MetricsReader(result, SparkReceiverIOIT.NAMESPACE);
+    return metricsReader.getCounterMetric(SparkReceiverIOIT.READ_ELEMENT_METRIC_NAME);
+  }
+
+  private Set<NamedTestResult> readMetrics(PipelineResult readResult) {
+    BiFunction<MetricsReader, String, NamedTestResult> supplier =
+        (reader, metricName) -> {
+          long start = reader.getStartTimeMetric(metricName);
+          long end = reader.getEndTimeMetric(metricName);
+          return NamedTestResult.create(TEST_ID, TIMESTAMP, metricName, (end - start) / 1e3);
+        };
+
+    NamedTestResult readTime =
+        supplier.apply(new MetricsReader(readResult, NAMESPACE), READ_TIME_METRIC_NAME);
+    NamedTestResult runTime =
+        NamedTestResult.create(TEST_ID, TIMESTAMP, RUN_TIME_METRIC_NAME, readTime.getValue());
+
+    return ImmutableSet.of(readTime, runTime);
+  }
+
+  @Test
+  public void testSparkReceiverIOReadsInStreamingWithOffset() throws IOException {
+
+    final List<String> messages =

Review Comment:
   When I ran the test with 600.000 records there was no OOM exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] aromanenko-dev commented on pull request #23305: [CdapIO] Add integration tests for SparkReceiverIO

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on PR #23305:
URL: https://github.com/apache/beam/pull/23305#issuecomment-1278654184

   Retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org