You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ki...@apache.org on 2022/03/25 19:43:08 UTC

[beam] branch master updated: [BEAM-8218] PulsarIO Connector

This is an automated email from the ASF dual-hosted git repository.

kileysok pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0459245  [BEAM-8218] PulsarIO Connector
     new 5502c6a  Merge pull request #16634 from MarcoRob/BEAM-8218
0459245 is described below

commit 045924595638c11129d789ddcced30218d93fc2a
Author: Marco Robles <ma...@Marroble-MacBookPro.local>
AuthorDate: Wed Jul 21 16:10:01 2021 -0500

    [BEAM-8218] PulsarIO Connector
    
    setting gradle config and initial pulsar integration testing io
    
    Change gradle for testing work
    
    add files
    
    modify config to be able to access beam imports
    
    files created, basic sdf implementation
    
    remove pulsarread file, unnecesary
    
    change pulsarrecord and fix tracker datatype in pulsario
    
    remove unnecesary files
    
    fix readme
    
    change offset to timestamp for restriction
    
    PTransform builder for read added, move sdf implementation, renamed pulsarsource
    
    Watermark estimator for Pulsar IO Reader SDF
    
    changes suggested as PR
    
    remove pulsar record
    
    Writer for pulsario
    
    change generic types to byte[]
    
    remove line break
    
    add pulsarmessage to implement coder change readfrompulsarSDF with new output and some extra changes
    
    add missing bracket
    
    fix errors
    
    modify SDF to set limit/end for SDF using msgid or timestamp
    
    add basic test for Pulsar IT
    
    adding unit tests for ReadFromPulsarDoFn
    
    fix unit tests for sdf when all the tests are run
    
    general changes for reader, writer and IO connector
    
    add a custom coder for sdf output
    
    unit tests and IT tests for pulsarIO
    
    remove code that enables test in main class
    
    fix unit test in order to not to inject test code into main class
    
    Updating gradle file with "implementation" method
    
    modify files to fix javaprecommit errors
    
    fix errors by checker frameworks
---
 sdks/java/io/pulsar/build.gradle                   |  47 ++++
 .../org/apache/beam/sdk/io/pulsar/PulsarIO.java    | 194 ++++++++++++++++
 .../apache/beam/sdk/io/pulsar/PulsarIOUtils.java   |  44 ++++
 .../apache/beam/sdk/io/pulsar/PulsarMessage.java   |  56 +++++
 .../beam/sdk/io/pulsar/PulsarMessageCoder.java     |  50 +++++
 .../beam/sdk/io/pulsar/PulsarSourceDescriptor.java |  63 ++++++
 .../beam/sdk/io/pulsar/ReadFromPulsarDoFn.java     | 250 +++++++++++++++++++++
 .../beam/sdk/io/pulsar/WriteToPulsarDoFn.java      |  56 +++++
 .../apache/beam/sdk/io/pulsar/package-info.java    |  19 ++
 .../org/apache/beam/sdk/io/pulsar/FakeMessage.java | 161 +++++++++++++
 .../beam/sdk/io/pulsar/FakePulsarClient.java       | 238 ++++++++++++++++++++
 .../beam/sdk/io/pulsar/FakePulsarReader.java       | 171 ++++++++++++++
 .../apache/beam/sdk/io/pulsar/PulsarIOTest.java    | 241 ++++++++++++++++++++
 .../beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java | 180 +++++++++++++++
 settings.gradle.kts                                |   3 +-
 15 files changed, 1771 insertions(+), 2 deletions(-)

diff --git a/sdks/java/io/pulsar/build.gradle b/sdks/java/io/pulsar/build.gradle
new file mode 100644
index 0000000..9ee14b0
--- /dev/null
+++ b/sdks/java/io/pulsar/build.gradle
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.pulsar')
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Pulsar"
+ext.summary = "IO to read and write to Pulsar"
+
+def pulsar_version = '2.8.2'
+
+
+dependencies {
+    implementation library.java.vendored_guava_26_0_jre
+    implementation library.java.slf4j_api
+    implementation library.java.joda_time
+
+    implementation "org.apache.pulsar:pulsar-client:$pulsar_version"
+    implementation "org.apache.pulsar:pulsar-client-admin:$pulsar_version"
+    permitUnusedDeclared "org.apache.pulsar:pulsar-client:$pulsar_version"
+    permitUnusedDeclared "org.apache.pulsar:pulsar-client-admin:$pulsar_version"
+    permitUsedUndeclared "org.apache.pulsar:pulsar-client-api:$pulsar_version"
+    permitUsedUndeclared "org.apache.pulsar:pulsar-client-admin-api:$pulsar_version"
+
+    implementation project(path: ":sdks:java:core", configuration: "shadow")
+    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
+    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
+    testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
+    testImplementation "org.testcontainers:pulsar:1.15.3"
+    testImplementation "org.assertj:assertj-core:2.9.1"
+
+}
diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
new file mode 100644
index 0000000..6d0f0a0
--- /dev/null
+++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.pulsar;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+@SuppressWarnings({"rawtypes", "nullness"})
+public class PulsarIO {
+
+  /** Static class, prevent instantiation. */
+  private PulsarIO() {}
+
+  public static Read read() {
+    return new AutoValue_PulsarIO_Read.Builder()
+        .setPulsarClient(PulsarIOUtils.PULSAR_CLIENT_SERIALIZABLE_FUNCTION)
+        .build();
+  }
+
+  @AutoValue
+  @SuppressWarnings({"rawtypes"})
+  public abstract static class Read extends PTransform<PBegin, PCollection<PulsarMessage>> {
+
+    abstract @Nullable String getClientUrl();
+
+    abstract @Nullable String getAdminUrl();
+
+    abstract @Nullable String getTopic();
+
+    abstract @Nullable Long getStartTimestamp();
+
+    abstract @Nullable Long getEndTimestamp();
+
+    abstract @Nullable MessageId getEndMessageId();
+
+    abstract @Nullable SerializableFunction<Message<byte[]>, Instant> getExtractOutputTimestampFn();
+
+    abstract SerializableFunction<String, PulsarClient> getPulsarClient();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setClientUrl(String url);
+
+      abstract Builder setAdminUrl(String url);
+
+      abstract Builder setTopic(String topic);
+
+      abstract Builder setStartTimestamp(Long timestamp);
+
+      abstract Builder setEndTimestamp(Long timestamp);
+
+      abstract Builder setEndMessageId(MessageId msgId);
+
+      abstract Builder setExtractOutputTimestampFn(
+          SerializableFunction<Message<byte[]>, Instant> fn);
+
+      abstract Builder setPulsarClient(SerializableFunction<String, PulsarClient> fn);
+
+      abstract Read build();
+    }
+
+    public Read withAdminUrl(String url) {
+      return builder().setAdminUrl(url).build();
+    }
+
+    public Read withClientUrl(String url) {
+      return builder().setClientUrl(url).build();
+    }
+
+    public Read withTopic(String topic) {
+      return builder().setTopic(topic).build();
+    }
+
+    public Read withStartTimestamp(Long timestamp) {
+      return builder().setStartTimestamp(timestamp).build();
+    }
+
+    public Read withEndTimestamp(Long timestamp) {
+      return builder().setEndTimestamp(timestamp).build();
+    }
+
+    public Read withEndMessageId(MessageId msgId) {
+      return builder().setEndMessageId(msgId).build();
+    }
+
+    public Read withExtractOutputTimestampFn(SerializableFunction<Message<byte[]>, Instant> fn) {
+      return builder().setExtractOutputTimestampFn(fn).build();
+    }
+
+    public Read withPublishTime() {
+      return withExtractOutputTimestampFn(ExtractOutputTimestampFn.usePublishTime());
+    }
+
+    public Read withProcessingTime() {
+      return withExtractOutputTimestampFn(ExtractOutputTimestampFn.useProcessingTime());
+    }
+
+    public Read withPulsarClient(SerializableFunction<String, PulsarClient> pulsarClientFn) {
+      return builder().setPulsarClient(pulsarClientFn).build();
+    }
+
+    @Override
+    public PCollection<PulsarMessage> expand(PBegin input) {
+      return input
+          .apply(
+              Create.of(
+                  PulsarSourceDescriptor.of(
+                      getTopic(),
+                      getStartTimestamp(),
+                      getEndTimestamp(),
+                      getEndMessageId(),
+                      getClientUrl(),
+                      getAdminUrl())))
+          .apply(ParDo.of(new ReadFromPulsarDoFn(this)))
+          .setCoder(PulsarMessageCoder.of());
+    }
+  }
+
+  public static Write write() {
+    return new AutoValue_PulsarIO_Write.Builder().build();
+  }
+
+  @AutoValue
+  @SuppressWarnings({"rawtypes"})
+  public abstract static class Write extends PTransform<PCollection<byte[]>, PDone> {
+
+    abstract @Nullable String getTopic();
+
+    abstract String getClientUrl();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setTopic(String topic);
+
+      abstract Builder setClientUrl(String clientUrl);
+
+      abstract Write build();
+    }
+
+    public Write withTopic(String topic) {
+      return builder().setTopic(topic).build();
+    }
+
+    public Write withClientUrl(String clientUrl) {
+      return builder().setClientUrl(clientUrl).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<byte[]> input) {
+      input.apply(ParDo.of(new WriteToPulsarDoFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+  }
+
+  static class ExtractOutputTimestampFn {
+    public static SerializableFunction<Message<byte[]>, Instant> useProcessingTime() {
+      return record -> Instant.now();
+    }
+
+    public static SerializableFunction<Message<byte[]>, Instant> usePublishTime() {
+      return record -> new Instant(record.getPublishTime());
+    }
+  }
+}
diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIOUtils.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIOUtils.java
new file mode 100644
index 0000000..53bc8e4
--- /dev/null
+++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIOUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class PulsarIOUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PulsarIOUtils.class);
+  public static final String SERVICE_HTTP_URL = "http://localhost:8080";
+  public static final String SERVICE_URL = "pulsar://localhost:6650";
+
+  static final SerializableFunction<String, PulsarClient> PULSAR_CLIENT_SERIALIZABLE_FUNCTION =
+      new SerializableFunction<String, PulsarClient>() {
+        @Override
+        public PulsarClient apply(String input) {
+          try {
+            return PulsarClient.builder().serviceUrl(input).build();
+          } catch (PulsarClientException e) {
+            LOG.error(e.getMessage());
+            throw new RuntimeException(e);
+          }
+        }
+      };
+}
diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarMessage.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarMessage.java
new file mode 100644
index 0000000..34fa989
--- /dev/null
+++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarMessage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.pulsar;
+
+/**
+ * Class representing a Pulsar Message record. Each PulsarMessage contains a single message basic
+ * message data and Message record to access directly.
+ */
+@SuppressWarnings("initialization.fields.uninitialized")
+public class PulsarMessage {
+  private String topic;
+  private Long publishTimestamp;
+  private Object messageRecord;
+
+  public PulsarMessage(String topic, Long publishTimestamp, Object messageRecord) {
+    this.topic = topic;
+    this.publishTimestamp = publishTimestamp;
+    this.messageRecord = messageRecord;
+  }
+
+  public PulsarMessage(String topic, Long publishTimestamp) {
+    this.topic = topic;
+    this.publishTimestamp = publishTimestamp;
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+
+  public Long getPublishTimestamp() {
+    return publishTimestamp;
+  }
+
+  public void setMessageRecord(Object messageRecord) {
+    this.messageRecord = messageRecord;
+  }
+
+  public Object getMessageRecord() {
+    return messageRecord;
+  }
+}
diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarMessageCoder.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarMessageCoder.java
new file mode 100644
index 0000000..2f3bed5
--- /dev/null
+++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarMessageCoder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.pulsar;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+public class PulsarMessageCoder extends CustomCoder<PulsarMessage> {
+
+  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+  private static final VarLongCoder longCoder = VarLongCoder.of();
+
+  public static PulsarMessageCoder of() {
+    return new PulsarMessageCoder();
+  }
+
+  public PulsarMessageCoder() {}
+
+  @Override
+  public void encode(PulsarMessage value, OutputStream outStream)
+      throws CoderException, IOException {
+    stringCoder.encode(value.getTopic(), outStream);
+    longCoder.encode(value.getPublishTimestamp(), outStream);
+  }
+
+  @Override
+  public PulsarMessage decode(InputStream inStream) throws CoderException, IOException {
+    return new PulsarMessage(stringCoder.decode(inStream), longCoder.decode(inStream));
+  }
+}
diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSourceDescriptor.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSourceDescriptor.java
new file mode 100644
index 0000000..427d37d
--- /dev/null
+++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSourceDescriptor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.pulsar;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.pulsar.client.api.MessageId;
+
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PulsarSourceDescriptor implements Serializable {
+
+  @SchemaFieldName("topic")
+  abstract String getTopic();
+
+  @SchemaFieldName("start_offset")
+  @Nullable
+  abstract Long getStartOffset();
+
+  @SchemaFieldName("end_offset")
+  @Nullable
+  abstract Long getEndOffset();
+
+  @SchemaFieldName("end_messageid")
+  @Nullable
+  abstract MessageId getEndMessageId();
+
+  @SchemaFieldName("client_url")
+  abstract String getClientUrl();
+
+  @SchemaFieldName("admin_url")
+  abstract String getAdminUrl();
+
+  public static PulsarSourceDescriptor of(
+      String topic,
+      Long startOffsetTimestamp,
+      Long endOffsetTimestamp,
+      MessageId endMessageId,
+      String clientUrl,
+      String adminUrl) {
+    return new AutoValue_PulsarSourceDescriptor(
+        topic, startOffsetTimestamp, endOffsetTimestamp, endMessageId, clientUrl, adminUrl);
+  }
+}
diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java
new file mode 100644
index 0000000..fc881f3
--- /dev/null
+++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.pulsar;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@DoFn.UnboundedPerElement
+@SuppressWarnings({"rawtypes", "nullness"})
+public class ReadFromPulsarDoFn extends DoFn<PulsarSourceDescriptor, PulsarMessage> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadFromPulsarDoFn.class);
+  private SerializableFunction<String, PulsarClient> pulsarClientSerializableFunction;
+  private PulsarClient client;
+  private PulsarAdmin admin;
+  private String clientUrl;
+  private String adminUrl;
+
+  private final SerializableFunction<Message<byte[]>, Instant> extractOutputTimestampFn;
+
+  public ReadFromPulsarDoFn(PulsarIO.Read transform) {
+    this.extractOutputTimestampFn = transform.getExtractOutputTimestampFn();
+    this.clientUrl = transform.getClientUrl();
+    this.adminUrl = transform.getAdminUrl();
+    this.pulsarClientSerializableFunction = transform.getPulsarClient();
+  }
+
+  // Open connection to Pulsar clients
+  @Setup
+  public void initPulsarClients() throws Exception {
+    if (this.clientUrl == null) {
+      this.clientUrl = PulsarIOUtils.SERVICE_URL;
+    }
+    if (this.adminUrl == null) {
+      this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+    }
+
+    if (this.client == null) {
+      this.client = pulsarClientSerializableFunction.apply(this.clientUrl);
+      if (this.client == null) {
+        this.client = PulsarClient.builder().serviceUrl(clientUrl).build();
+      }
+    }
+
+    if (this.admin == null) {
+      this.admin =
+          PulsarAdmin.builder()
+              .serviceHttpUrl(adminUrl)
+              .tlsTrustCertsFilePath(null)
+              .allowTlsInsecureConnection(false)
+              .build();
+    }
+  }
+
+  // Close connection to Pulsar clients
+  @Teardown
+  public void teardown() throws Exception {
+    this.client.close();
+    this.admin.close();
+  }
+
+  @GetInitialRestriction
+  public OffsetRange getInitialRestriction(@Element PulsarSourceDescriptor pulsarSource) {
+    long startTimestamp = 0L;
+    long endTimestamp = Long.MAX_VALUE;
+
+    if (pulsarSource.getStartOffset() != null) {
+      startTimestamp = pulsarSource.getStartOffset();
+    }
+
+    if (pulsarSource.getEndOffset() != null) {
+      endTimestamp = pulsarSource.getEndOffset();
+    }
+
+    return new OffsetRange(startTimestamp, endTimestamp);
+  }
+
+  /*
+  It may define a DoFn.GetSize method or ensure that the RestrictionTracker implements
+  RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or splitting may result
+  if size or progress is an inaccurate representation of work.
+  See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+  */
+  @GetSize
+  public double getSize(
+      @Element PulsarSourceDescriptor pulsarSource, @Restriction OffsetRange range) {
+    // TODO improve getsize estiamate, check pulsar stats to improve get size estimate
+    // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+    double estimateRecords =
+        restrictionTracker(pulsarSource, range).getProgress().getWorkRemaining();
+    return estimateRecords;
+  }
+
+  public Reader<byte[]> newReader(PulsarClient client, String topicPartition)
+      throws PulsarClientException {
+    ReaderBuilder<byte[]> builder =
+        client.newReader().topic(topicPartition).startMessageId(MessageId.earliest);
+    return builder.create();
+  }
+
+  @GetRestrictionCoder
+  public Coder<OffsetRange> getRestrictionCoder() {
+    return new OffsetRange.Coder();
+  }
+
+  @ProcessElement
+  public ProcessContinuation processElement(
+      @Element PulsarSourceDescriptor pulsarSourceDescriptor,
+      RestrictionTracker<OffsetRange, Long> tracker,
+      WatermarkEstimator watermarkEstimator,
+      OutputReceiver<PulsarMessage> output)
+      throws IOException {
+    long startTimestamp = tracker.currentRestriction().getFrom();
+    String topicDescriptor = pulsarSourceDescriptor.getTopic();
+    try (Reader<byte[]> reader = newReader(this.client, topicDescriptor)) {
+      if (startTimestamp > 0) {
+        reader.seek(startTimestamp);
+      }
+      while (true) {
+        if (reader.hasReachedEndOfTopic()) {
+          reader.close();
+          return ProcessContinuation.stop();
+        }
+        Message<byte[]> message = reader.readNext();
+        if (message == null) {
+          return ProcessContinuation.resume();
+        }
+        Long currentTimestamp = message.getPublishTime();
+        // if tracker.tryclaim() return true, sdf must execute work otherwise
+        // doFn must exit processElement() without doing any work associated
+        // or claiming more work
+        if (!tracker.tryClaim(currentTimestamp)) {
+          reader.close();
+          return ProcessContinuation.stop();
+        }
+        if (pulsarSourceDescriptor.getEndMessageId() != null) {
+          MessageId currentMsgId = message.getMessageId();
+          boolean hasReachedEndMessageId =
+              currentMsgId.compareTo(pulsarSourceDescriptor.getEndMessageId()) == 0;
+          if (hasReachedEndMessageId) {
+            return ProcessContinuation.stop();
+          }
+        }
+        PulsarMessage pulsarMessage =
+            new PulsarMessage(message.getTopicName(), message.getPublishTime(), message);
+        Instant outputTimestamp = extractOutputTimestampFn.apply(message);
+        output.outputWithTimestamp(pulsarMessage, outputTimestamp);
+      }
+    }
+  }
+
+  @GetInitialWatermarkEstimatorState
+  public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
+    return currentElementTimestamp;
+  }
+
+  @NewWatermarkEstimator
+  public WatermarkEstimator<Instant> newWatermarkEstimator(
+      @WatermarkEstimatorState Instant watermarkEstimatorState) {
+    return new WatermarkEstimators.MonotonicallyIncreasing(
+        ensureTimestampWithinBounds(watermarkEstimatorState));
+  }
+
+  @NewTracker
+  public OffsetRangeTracker restrictionTracker(
+      @Element PulsarSourceDescriptor pulsarSource, @Restriction OffsetRange restriction) {
+    if (restriction.getTo() < Long.MAX_VALUE) {
+      return new OffsetRangeTracker(restriction);
+    }
+
+    PulsarLatestOffsetEstimator offsetEstimator =
+        new PulsarLatestOffsetEstimator(this.admin, pulsarSource.getTopic());
+    return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator);
+  }
+
+  private static class PulsarLatestOffsetEstimator
+      implements GrowableOffsetRangeTracker.RangeEndEstimator {
+
+    private final Supplier<Message> memoizedBacklog;
+
+    private PulsarLatestOffsetEstimator(PulsarAdmin admin, String topic) {
+      this.memoizedBacklog =
+          Suppliers.memoizeWithExpiration(
+              () -> {
+                try {
+                  Message<byte[]> lastMsg = admin.topics().examineMessage(topic, "latest", 1);
+                  return lastMsg;
+                } catch (PulsarAdminException e) {
+                  LOG.error(e.getMessage());
+                  throw new RuntimeException(e);
+                }
+              },
+              1,
+              TimeUnit.SECONDS);
+    }
+
+    @Override
+    public long estimate() {
+      Message<byte[]> msg = memoizedBacklog.get();
+      return msg.getPublishTime();
+    }
+  }
+
+  private static Instant ensureTimestampWithinBounds(Instant timestamp) {
+    if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+      timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    }
+    return timestamp;
+  }
+}
diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java
new file mode 100644
index 0000000..9659940
--- /dev/null
+++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@DoFn.UnboundedPerElement
+@SuppressWarnings({"rawtypes", "nullness"})
+public class WriteToPulsarDoFn extends DoFn<byte[], Void> {
+
+  private Producer<byte[]> producer;
+  private PulsarClient client;
+  private String clientUrl;
+  private String topic;
+
+  WriteToPulsarDoFn(PulsarIO.Write transform) {
+    this.clientUrl = transform.getClientUrl();
+    this.topic = transform.getTopic();
+  }
+
+  @Setup
+  public void setup() throws PulsarClientException {
+    client = PulsarClient.builder().serviceUrl(clientUrl).build();
+    producer = client.newProducer().topic(topic).compressionType(CompressionType.LZ4).create();
+  }
+
+  @ProcessElement
+  public void processElement(@Element byte[] messageToSend) throws Exception {
+    producer.send(messageToSend);
+  }
+
+  @Teardown
+  public void teardown() throws PulsarClientException {
+    producer.close();
+    client.close();
+  }
+}
diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/package-info.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/package-info.java
new file mode 100644
index 0000000..8b7ab10
--- /dev/null
+++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Transforms for reading and writing from Apache Pulsar. */
+package org.apache.beam.sdk.io.pulsar;
diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakeMessage.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakeMessage.java
new file mode 100644
index 0000000..9cdc4af
--- /dev/null
+++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakeMessage.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.pulsar;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.internal.DefaultImplementation;
+import org.apache.pulsar.common.api.EncryptionContext;
+
+public class FakeMessage implements Message<byte[]> {
+
+  private String topic;
+  private long ledgerId;
+  private long entryId;
+  private int partitionIndex;
+  private long timestamp;
+
+  public FakeMessage(
+      String topic, long timestamp, long ledgerId, long entryId, int partitionIndex) {
+    this.topic = topic;
+    this.ledgerId = ledgerId;
+    this.entryId = entryId;
+    this.partitionIndex = partitionIndex;
+    this.timestamp = timestamp;
+  }
+
+  @Override
+  public Map<String, String> getProperties() {
+    return null;
+  }
+
+  @Override
+  public boolean hasProperty(String name) {
+    return false;
+  }
+
+  @Override
+  public String getProperty(String name) {
+    return null;
+  }
+
+  @Override
+  public byte[] getData() {
+    return new byte[0];
+  }
+
+  @Override
+  public int size() {
+    return 0;
+  }
+
+  @Override
+  public byte[] getValue() {
+    return null;
+  }
+
+  @Override
+  public MessageId getMessageId() {
+    return DefaultImplementation.newMessageId(this.ledgerId, this.entryId, this.partitionIndex);
+  }
+
+  @Override
+  public long getPublishTime() {
+    return timestamp;
+  }
+
+  @Override
+  public long getEventTime() {
+    return 0;
+  }
+
+  @Override
+  public long getSequenceId() {
+    return 0;
+  }
+
+  @Override
+  public String getProducerName() {
+    return null;
+  }
+
+  @Override
+  public boolean hasKey() {
+    return false;
+  }
+
+  @Override
+  public String getKey() {
+    return null;
+  }
+
+  @Override
+  public boolean hasBase64EncodedKey() {
+    return false;
+  }
+
+  @Override
+  public byte[] getKeyBytes() {
+    return new byte[0];
+  }
+
+  @Override
+  public boolean hasOrderingKey() {
+    return false;
+  }
+
+  @Override
+  public byte[] getOrderingKey() {
+    return new byte[0];
+  }
+
+  @Override
+  public String getTopicName() {
+    return this.topic;
+  }
+
+  @Override
+  public Optional<EncryptionContext> getEncryptionCtx() {
+    return Optional.empty();
+  }
+
+  @Override
+  public int getRedeliveryCount() {
+    return 0;
+  }
+
+  @Override
+  public byte[] getSchemaVersion() {
+    return new byte[0];
+  }
+
+  @Override
+  public boolean isReplicated() {
+    return false;
+  }
+
+  @Override
+  public String getReplicatedFrom() {
+    return null;
+  }
+
+  @Override
+  public void release() {}
+}
diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakePulsarClient.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakePulsarClient.java
new file mode 100644
index 0000000..4639d84
--- /dev/null
+++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakePulsarClient.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.pulsar;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Range;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.transaction.TransactionBuilder;
+
+@SuppressWarnings({"rawtypes"})
+public class FakePulsarClient implements PulsarClient {
+
+  private MockReaderBuilder readerBuilder;
+
+  public FakePulsarClient(Reader<byte[]> reader) {
+    this.setReader(reader);
+  }
+
+  @Override
+  public ProducerBuilder<byte[]> newProducer() {
+    return null;
+  }
+
+  @Override
+  public <T> ProducerBuilder<T> newProducer(Schema<T> schema) {
+    return null;
+  }
+
+  @Override
+  public ConsumerBuilder<byte[]> newConsumer() {
+    return null;
+  }
+
+  @Override
+  public <T> ConsumerBuilder<T> newConsumer(Schema<T> schema) {
+    return null;
+  }
+
+  public void setReader(Reader<byte[]> reader) {
+    this.initReaderBuilder();
+    readerBuilder.setReader(reader);
+  }
+
+  public void initReaderBuilder() {
+    if (this.readerBuilder == null) {
+      this.readerBuilder = new MockReaderBuilder();
+    }
+  }
+
+  @Override
+  public ReaderBuilder<byte[]> newReader() {
+    this.initReaderBuilder();
+    return this.readerBuilder;
+  }
+
+  @Override
+  public <T> ReaderBuilder<T> newReader(Schema<T> schema) {
+    return null;
+  }
+
+  @Override
+  public void updateServiceUrl(String serviceUrl) throws PulsarClientException {}
+
+  public void serviceUrl(String serviceUrl) {}
+
+  @Override
+  public CompletableFuture<List<String>> getPartitionsForTopic(String topic) {
+    return null;
+  }
+
+  @Override
+  public void close() throws PulsarClientException {}
+
+  @Override
+  public CompletableFuture<Void> closeAsync() {
+    return null;
+  }
+
+  @Override
+  public void shutdown() throws PulsarClientException {}
+
+  @Override
+  public boolean isClosed() {
+    return false;
+  }
+
+  @Override
+  public TransactionBuilder newTransaction() throws PulsarClientException {
+    return null;
+  }
+
+  static class MockReaderBuilder implements ReaderBuilder<byte[]> {
+
+    private int numberOfMessages = 100;
+    private String topic;
+    private Reader<byte[]> reader;
+
+    public MockReaderBuilder() {}
+
+    public void setReader(Reader<byte[]> reader) {
+      this.reader = reader;
+    }
+
+    @Override
+    public Reader<byte[]> create() throws PulsarClientException {
+      if (this.reader != null) {
+        return this.reader;
+      }
+      this.reader = new FakePulsarReader(this.topic, this.numberOfMessages);
+      return this.reader;
+    }
+
+    @Override
+    public CompletableFuture<Reader<byte[]>> createAsync() {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> clone() {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> topic(String topicName) {
+      this.topic = topicName;
+      return this;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> startMessageId(MessageId startMessageId) {
+      return this;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> startMessageFromRollbackDuration(
+        long rollbackDuration, TimeUnit timeunit) {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> startMessageIdInclusive() {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> readerListener(ReaderListener readerListener) {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> defaultCryptoKeyReader(String privateKey) {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> cryptoFailureAction(ConsumerCryptoFailureAction action) {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> receiverQueueSize(int receiverQueueSize) {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> readerName(String readerName) {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> subscriptionRolePrefix(String subscriptionRolePrefix) {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> subscriptionName(String subscriptionName) {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> readCompacted(boolean readCompacted) {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> keyHashRange(Range... ranges) {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> defaultCryptoKeyReader(Map privateKeys) {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> topics(List topicNames) {
+      return null;
+    }
+
+    @Override
+    public ReaderBuilder<byte[]> loadConf(Map config) {
+      return null;
+    }
+  }
+}
diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakePulsarReader.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakePulsarReader.java
new file mode 100644
index 0000000..834fd04
--- /dev/null
+++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakePulsarReader.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.pulsar;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+public class FakePulsarReader implements Reader<byte[]> {
+
+  private String topic;
+  private List<FakeMessage> fakeMessages = new ArrayList<>();
+  private int currentMsg;
+  private long startTimestamp;
+  private long endTimestamp;
+  private boolean reachedEndOfTopic;
+  private int numberOfMessages;
+
+  public FakePulsarReader(String topic, int numberOfMessages) {
+    this.numberOfMessages = numberOfMessages;
+    this.setMock(topic, numberOfMessages);
+  }
+
+  public void setReachedEndOfTopic(boolean hasReachedEnd) {
+    this.reachedEndOfTopic = hasReachedEnd;
+  }
+
+  public void setMock(String topic, int numberOfMessages) {
+    this.topic = topic;
+    for (int i = 0; i < numberOfMessages; i++) {
+      long timestamp = Instant.now().plus(Duration.standardSeconds(i)).getMillis();
+      if (i == 0) {
+        startTimestamp = timestamp;
+      } else if (i == 99) {
+        endTimestamp = timestamp;
+      }
+      fakeMessages.add(new FakeMessage(topic, timestamp, Long.valueOf(i), Long.valueOf(i), i));
+    }
+    currentMsg = 0;
+  }
+
+  public void reset() {
+    this.reachedEndOfTopic = false;
+    this.currentMsg = 0;
+    emptyMockRecords();
+    setMock(topic, numberOfMessages);
+  }
+
+  public void emptyMockRecords() {
+    this.fakeMessages.clear();
+  }
+
+  public long getStartTimestamp() {
+    return this.startTimestamp;
+  }
+
+  public long getEndTimestamp() {
+    return this.endTimestamp;
+  }
+
+  @Override
+  public String getTopic() {
+    return this.topic;
+  }
+
+  @Override
+  public Message<byte[]> readNext() throws PulsarClientException {
+    if (currentMsg == 0 && fakeMessages.isEmpty()) {
+      return null;
+    }
+
+    Message<byte[]> msg = fakeMessages.get(currentMsg);
+    if (currentMsg <= fakeMessages.size() - 1) {
+      currentMsg++;
+    }
+    return msg;
+  }
+
+  @Override
+  public Message<byte[]> readNext(int timeout, TimeUnit unit) throws PulsarClientException {
+    return null;
+  }
+
+  @Override
+  public CompletableFuture<Message<byte[]>> readNextAsync() {
+    return null;
+  }
+
+  @Override
+  public CompletableFuture<Void> closeAsync() {
+    return null;
+  }
+
+  @Override
+  public boolean hasReachedEndOfTopic() {
+    return this.reachedEndOfTopic;
+  }
+
+  @Override
+  public boolean hasMessageAvailable() throws PulsarClientException {
+    return false;
+  }
+
+  @Override
+  public CompletableFuture<Boolean> hasMessageAvailableAsync() {
+    return null;
+  }
+
+  @Override
+  public boolean isConnected() {
+    return false;
+  }
+
+  @Override
+  public void seek(MessageId messageId) throws PulsarClientException {}
+
+  @Override
+  public void seek(long timestamp) throws PulsarClientException {
+    for (int i = 0; i < fakeMessages.size(); i++) {
+      if (timestamp == fakeMessages.get(i).getPublishTime()) {
+        currentMsg = i;
+        break;
+      }
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> seekAsync(MessageId messageId) {
+    return null;
+  }
+
+  @Override
+  public CompletableFuture<Void> seekAsync(long timestamp) {
+    return null;
+  }
+
+  @Override
+  public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
+    return null;
+  }
+
+  @Override
+  public void seek(Function<String, Object> function) throws PulsarClientException {}
+
+  @Override
+  public void close() throws IOException {}
+}
diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java
new file mode 100644
index 0000000..eeb6a5d
--- /dev/null
+++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.pulsar;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.utility.DockerImageName;
+
+@RunWith(JUnit4.class)
+public class PulsarIOTest {
+
+  private static final String TOPIC = "PULSAR_IO_TEST";
+  protected static PulsarContainer pulsarContainer;
+  protected static PulsarClient client;
+
+  private long endExpectedTime = 0;
+  private long startTime = 0;
+
+  private static final Logger LOG = LoggerFactory.getLogger(PulsarIOTest.class);
+
+  @Rule public final transient TestPipeline testPipeline = TestPipeline.create();
+
+  public List<Message<byte[]>> receiveMessages() throws PulsarClientException {
+    if (client == null) {
+      initClient();
+    }
+    List<Message<byte[]>> messages = new ArrayList<>();
+    Consumer<byte[]> consumer =
+        client.newConsumer().topic(TOPIC).subscriptionName("receiveMockMessageFn").subscribe();
+    while (consumer.hasReachedEndOfTopic()) {
+      Message<byte[]> msg = consumer.receive();
+      messages.add(msg);
+      try {
+        consumer.acknowledge(msg);
+      } catch (Exception e) {
+        consumer.negativeAcknowledge(msg);
+      }
+    }
+    return messages;
+  }
+
+  public List<PulsarMessage> produceMessages() throws PulsarClientException {
+    client = initClient();
+    Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
+    Consumer<byte[]> consumer =
+        client.newConsumer().topic(TOPIC).subscriptionName("produceMockMessageFn").subscribe();
+    int numElements = 101;
+    List<PulsarMessage> inputs = new ArrayList<>();
+    for (int i = 0; i < numElements; i++) {
+      String msg = ("PULSAR_TEST_READFROMSIMPLETOPIC_" + i);
+      producer.send(msg.getBytes(StandardCharsets.UTF_8));
+      CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
+      Message<byte[]> message = null;
+      try {
+        message = future.get(5, TimeUnit.SECONDS);
+        if (i >= 100) {
+          endExpectedTime = message.getPublishTime();
+        } else {
+          inputs.add(new PulsarMessage(message.getTopicName(), message.getPublishTime(), message));
+          if (i == 0) {
+            startTime = message.getPublishTime();
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.error(e.getMessage());
+      } catch (ExecutionException e) {
+        LOG.error(e.getMessage());
+      } catch (TimeoutException e) {
+        LOG.error(e.getMessage());
+      }
+    }
+    consumer.close();
+    producer.close();
+    client.close();
+    return inputs;
+  }
+
+  private static PulsarClient initClient() throws PulsarClientException {
+    return PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build();
+  }
+
+  private static void setupPulsarContainer() {
+    pulsarContainer = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:2.9.0"));
+    pulsarContainer.withCommand("bin/pulsar", "standalone");
+    pulsarContainer.start();
+  }
+
+  @BeforeClass
+  public static void setup() throws PulsarClientException {
+    setupPulsarContainer();
+    client = initClient();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (pulsarContainer != null) {
+      pulsarContainer.stop();
+    }
+  }
+
+  @Test
+  @SuppressWarnings({"rawtypes"})
+  public void testPulsarFunctionality() throws Exception {
+    try (Consumer consumer =
+            client.newConsumer().topic(TOPIC).subscriptionName("PulsarIO_IT").subscribe();
+        Producer<byte[]> producer = client.newProducer().topic(TOPIC).create(); ) {
+      String messageTxt = "testing pulsar functionality";
+      producer.send(messageTxt.getBytes(StandardCharsets.UTF_8));
+      CompletableFuture<Message> future = consumer.receiveAsync();
+      Message message = future.get(5, TimeUnit.SECONDS);
+      assertEquals(messageTxt, new String(message.getData(), StandardCharsets.UTF_8));
+      client.close();
+    }
+  }
+
+  @Test
+  public void testReadFromSimpleTopic() {
+    try {
+      List<PulsarMessage> inputsMock = produceMessages();
+      PulsarIO.Read reader =
+          PulsarIO.read()
+              .withClientUrl(pulsarContainer.getPulsarBrokerUrl())
+              .withAdminUrl(pulsarContainer.getHttpServiceUrl())
+              .withTopic(TOPIC)
+              .withStartTimestamp(startTime)
+              .withEndTimestamp(endExpectedTime)
+              .withPublishTime();
+      testPipeline.apply(reader).apply(ParDo.of(new PulsarRecordsMetric()));
+
+      PipelineResult pipelineResult = testPipeline.run();
+      MetricQueryResults metrics =
+          pipelineResult
+              .metrics()
+              .queryMetrics(
+                  MetricsFilter.builder()
+                      .addNameFilter(
+                          MetricNameFilter.named(
+                              PulsarIOTest.class.getName(), "PulsarRecordsCounter"))
+                      .build());
+      long recordsCount = 0;
+      for (MetricResult<Long> metric : metrics.getCounters()) {
+        if (metric
+            .getName()
+            .toString()
+            .equals("org.apache.beam.sdk.io.pulsar.PulsarIOTest:PulsarRecordsCounter")) {
+          recordsCount = metric.getAttempted();
+          break;
+        }
+      }
+      assertEquals(inputsMock.size(), (int) recordsCount);
+
+    } catch (PulsarClientException e) {
+      LOG.error(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testWriteFromTopic() {
+    try {
+      PulsarIO.Write writer =
+          PulsarIO.write().withClientUrl(pulsarContainer.getPulsarBrokerUrl()).withTopic(TOPIC);
+      int numberOfMessages = 100;
+      List<byte[]> messages = new ArrayList<>();
+      for (int i = 0; i < numberOfMessages; i++) {
+        messages.add(("PULSAR_WRITER_TEST_" + i).getBytes(StandardCharsets.UTF_8));
+      }
+      testPipeline.apply(Create.of(messages)).apply(writer);
+
+      testPipeline.run();
+
+      List<Message<byte[]>> receiveMsgs = receiveMessages();
+      assertEquals(numberOfMessages, receiveMessages().size());
+      for (int i = 0; i < numberOfMessages; i++) {
+        assertTrue(
+            new String(receiveMsgs.get(i).getValue(), StandardCharsets.UTF_8)
+                .equals("PULSAR_WRITER_TEST_" + i));
+      }
+    } catch (Exception e) {
+      LOG.error(e.getMessage());
+    }
+  }
+
+  public static class PulsarRecordsMetric extends DoFn<PulsarMessage, PulsarMessage> {
+    private final Counter counter =
+        Metrics.counter(PulsarIOTest.class.getName(), "PulsarRecordsCounter");
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      counter.inc();
+      context.output(context.element());
+    }
+  }
+}
diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java
new file mode 100644
index 0000000..273a191
--- /dev/null
+++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.pulsar;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.internal.DefaultImplementation;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ReadFromPulsarDoFnTest {
+
+  public static final String SERVICE_URL = "pulsar://localhost:6650";
+  public static final String ADMIN_URL = "http://localhost:8080";
+  public static final String TOPIC = "PULSARIO_READFROMPULSAR_TEST";
+  public static final int NUMBEROFMESSAGES = 100;
+
+  private final ReadFromPulsarDoFn dofnInstance = new ReadFromPulsarDoFn(readSourceDescriptor());
+  public FakePulsarReader fakePulsarReader = new FakePulsarReader(TOPIC, NUMBEROFMESSAGES);
+  private FakePulsarClient fakePulsarClient = new FakePulsarClient(fakePulsarReader);
+
+  private PulsarIO.Read readSourceDescriptor() {
+    return PulsarIO.read()
+        .withClientUrl(SERVICE_URL)
+        .withTopic(TOPIC)
+        .withAdminUrl(ADMIN_URL)
+        .withPublishTime()
+        .withPulsarClient(
+            new SerializableFunction<String, PulsarClient>() {
+              @Override
+              public PulsarClient apply(String input) {
+                return fakePulsarClient;
+              }
+            });
+  }
+
+  @Before
+  public void setup() throws Exception {
+    dofnInstance.initPulsarClients();
+    fakePulsarReader.reset();
+  }
+
+  @Test
+  public void testInitialRestrictionWhenHasStartOffset() throws Exception {
+    long expectedStartOffset = 0;
+    OffsetRange result =
+        dofnInstance.getInitialRestriction(
+            PulsarSourceDescriptor.of(
+                TOPIC, expectedStartOffset, null, null, SERVICE_URL, ADMIN_URL));
+    assertEquals(new OffsetRange(expectedStartOffset, Long.MAX_VALUE), result);
+  }
+
+  @Test
+  public void testInitialRestrictionWithConsumerPosition() throws Exception {
+    long expectedStartOffset = Instant.now().getMillis();
+    OffsetRange result =
+        dofnInstance.getInitialRestriction(
+            PulsarSourceDescriptor.of(
+                TOPIC, expectedStartOffset, null, null, SERVICE_URL, ADMIN_URL));
+    assertEquals(new OffsetRange(expectedStartOffset, Long.MAX_VALUE), result);
+  }
+
+  @Test
+  public void testInitialRestrictionWithConsumerEndPosition() throws Exception {
+    long startOffset = fakePulsarReader.getStartTimestamp();
+    long endOffset = fakePulsarReader.getEndTimestamp();
+    OffsetRange result =
+        dofnInstance.getInitialRestriction(
+            PulsarSourceDescriptor.of(TOPIC, startOffset, endOffset, null, SERVICE_URL, ADMIN_URL));
+    assertEquals(new OffsetRange(startOffset, endOffset), result);
+  }
+
+  @Test
+  public void testProcessElement() throws Exception {
+    MockOutputReceiver receiver = new MockOutputReceiver();
+    long startOffset = fakePulsarReader.getStartTimestamp();
+    long endOffset = fakePulsarReader.getEndTimestamp();
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(startOffset, endOffset));
+    PulsarSourceDescriptor descriptor =
+        PulsarSourceDescriptor.of(TOPIC, startOffset, endOffset, null, SERVICE_URL, ADMIN_URL);
+    DoFn.ProcessContinuation result =
+        dofnInstance.processElement(descriptor, tracker, null, (DoFn.OutputReceiver) receiver);
+    int expectedResultWithoutCountingLastOffset = NUMBEROFMESSAGES - 1;
+    assertEquals(DoFn.ProcessContinuation.stop(), result);
+    assertEquals(expectedResultWithoutCountingLastOffset, receiver.getOutputs().size());
+  }
+
+  @Test
+  public void testProcessElementWhenEndMessageIdIsDefined() throws Exception {
+    MockOutputReceiver receiver = new MockOutputReceiver();
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
+    MessageId endMessageId = DefaultImplementation.newMessageId(50L, 50L, 50);
+    DoFn.ProcessContinuation result =
+        dofnInstance.processElement(
+            PulsarSourceDescriptor.of(TOPIC, null, null, endMessageId, SERVICE_URL, ADMIN_URL),
+            tracker,
+            null,
+            (DoFn.OutputReceiver) receiver);
+    assertEquals(DoFn.ProcessContinuation.stop(), result);
+    assertEquals(50, receiver.getOutputs().size());
+  }
+
+  @Test
+  public void testProcessElementWithEmptyRecords() throws Exception {
+    MockOutputReceiver receiver = new MockOutputReceiver();
+    fakePulsarReader.emptyMockRecords();
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
+    DoFn.ProcessContinuation result =
+        dofnInstance.processElement(
+            PulsarSourceDescriptor.of(TOPIC, null, null, null, SERVICE_URL, ADMIN_URL),
+            tracker,
+            null,
+            (DoFn.OutputReceiver) receiver);
+    assertEquals(DoFn.ProcessContinuation.resume(), result);
+    assertTrue(receiver.getOutputs().isEmpty());
+  }
+
+  @Test
+  public void testProcessElementWhenHasReachedEndTopic() throws Exception {
+    MockOutputReceiver receiver = new MockOutputReceiver();
+    fakePulsarReader.setReachedEndOfTopic(true);
+    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
+    DoFn.ProcessContinuation result =
+        dofnInstance.processElement(
+            PulsarSourceDescriptor.of(TOPIC, null, null, null, SERVICE_URL, ADMIN_URL),
+            tracker,
+            null,
+            (DoFn.OutputReceiver) receiver);
+    assertEquals(DoFn.ProcessContinuation.stop(), result);
+  }
+
+  private static class MockOutputReceiver implements DoFn.OutputReceiver<PulsarMessage> {
+
+    private final List<PulsarMessage> records = new ArrayList<>();
+
+    @Override
+    public void output(PulsarMessage output) {}
+
+    @Override
+    public void outputWithTimestamp(
+        PulsarMessage output, @UnknownKeyFor @NonNull @Initialized Instant timestamp) {
+      records.add(output);
+    }
+
+    public List<PulsarMessage> getOutputs() {
+      return records;
+    }
+  }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 55a0c85..4368a16 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -184,6 +184,7 @@ include(":sdks:java:io:mongodb")
 include(":sdks:java:io:mqtt")
 include(":sdks:java:io:neo4j")
 include(":sdks:java:io:parquet")
+include(":sdks:java:io:pulsar")
 include(":sdks:java:io:rabbitmq")
 include(":sdks:java:io:redis")
 include(":sdks:java:io:solr")
@@ -251,5 +252,3 @@ include("beam-test-jenkins")
 project(":beam-test-jenkins").projectDir = file(".test-infra/jenkins")
 include("beam-validate-runner")
 project(":beam-validate-runner").projectDir = file(".test-infra/validate-runner")
-
-