You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by je...@apache.org on 2023/03/27 17:59:15 UTC

[camel] branch main updated: CAMEL-18551: camel-salesforce: Support for Pub/Sub API.

This is an automated email from the ASF dual-hosted git repository.

jeremyross pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c2dd459ac1 CAMEL-18551: camel-salesforce: Support for Pub/Sub API.
1c2dd459ac1 is described below

commit 1c2dd459ac11f728f3236b88ceab18a963c81ea9
Author: Jeremy Ross <je...@gmail.com>
AuthorDate: Mon Mar 27 12:36:49 2023 -0500

    CAMEL-18551: camel-salesforce: Support for Pub/Sub API.
---
 .../camel-salesforce-codegen/pom.xml               |   5 +
 .../codegen/AbstractSalesforceExecution.java       |  54 +-
 .../salesforce/codegen/GenerateExecution.java      |   5 +-
 .../codegen/GeneratePubSubExecution.java           |  55 +++
 .../salesforce/codegen/SchemaExecution.java        |   5 +-
 .../camel-salesforce-component/pom.xml             |  65 +++
 .../salesforce/SalesforceComponentConfigurer.java  |  36 ++
 .../salesforce/SalesforceEndpointConfigurer.java   |  30 ++
 .../salesforce/SalesforceEndpointUriFactory.java   |   7 +-
 .../camel/component/salesforce/salesforce.json     |  20 +-
 .../src/main/docs/salesforce-component.adoc        |  87 ++++
 .../component/salesforce/PubSubApiConsumer.java    | 113 +++++
 .../salesforce/PubSubDeserializeType.java          |  25 +
 .../component/salesforce/SalesforceComponent.java  |  51 +-
 .../component/salesforce/SalesforceConstants.java  |   4 +-
 .../component/salesforce/SalesforceEndpoint.java   |  31 +-
 .../salesforce/SalesforceEndpointConfig.java       |  75 +++
 .../component/salesforce/SalesforceProducer.java   |   7 +
 ...orceConsumer.java => StreamingApiConsumer.java} |   8 +-
 .../salesforce/api/dto/pubsub/PublishResult.java   |  59 +++
 .../salesforce/internal/OperationName.java         |   7 +-
 .../salesforce/internal/SalesforceSession.java     |  67 ++-
 .../internal/client/PubSubApiClient.java           | 413 ++++++++++++++++
 .../internal/client/TokenCredentials.java          |  53 ++
 .../processor/AbstractSalesforceProcessor.java     |   4 +
 .../internal/processor/PubSubApiProcessor.java     |  97 ++++
 .../internal/streaming/SubscriptionHelper.java     |  60 +--
 .../src/main/proto/pubsub_api.proto                | 330 +++++++++++++
 .../com/sforce/eventbus/CamelEventMessage__e.java  | 548 +++++++++++++++++++++
 .../salesforce/PubSubApiIntegrationTest.java       | 431 ++++++++++++++++
 .../camel/component/salesforce/PubSubApiTest.java  |  75 +++
 .../component/salesforce/PubSubPojoEvent.java      |  74 +++
 .../camel/component/salesforce/RawPayloadTest.java |   4 +-
 ...umerTest.java => StreamingApiConsumerTest.java} |  18 +-
 .../salesforce/internal/SalesforceSessionTest.java |  10 +-
 .../internal/pubsub/AuthErrorPubSubServer.java     |  71 +++
 .../SubscriptionHelperIntegrationTest.java         |  59 ++-
 .../camel-salesforce-maven-plugin/README.md        |   9 +-
 .../apache/camel/maven/AbstractSalesforceMojo.java |   2 +-
 .../org/apache/camel/maven/GeneratePubSubMojo.java |  59 +++
 .../AbstractSalesforceMojoIntegrationTest.java     |   7 +-
 .../maven/GeneratePubSubMojoIntegrationTest.java   |  90 ++++
 .../salesforce/objects/CamelEventMessage__e.object |  20 +
 .../salesforce/objects/CamelEventNote__e.object    |  20 +
 .../it/resources/salesforce/package.xml            |   6 +-
 ...s_AccountChangeEvent.platformEventChannelMember |   5 +
 .../ChangeEvents.platformEventChannel              |  24 -
 .../it/resources/salesforce/tabs/Invoice__c.tab    |   1 -
 .../resources/salesforce/tabs/Merchandise__c.tab   |   1 -
 components/camel-salesforce/pom.xml                |   8 +
 50 files changed, 3161 insertions(+), 154 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-codegen/pom.xml b/components/camel-salesforce/camel-salesforce-codegen/pom.xml
index ed1c0b91fe7..3e43ee7a78b 100644
--- a/components/camel-salesforce/camel-salesforce-codegen/pom.xml
+++ b/components/camel-salesforce/camel-salesforce-codegen/pom.xml
@@ -75,6 +75,11 @@
             <artifactId>commons-text</artifactId>
             <version>${commons-text-version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro-compiler</artifactId>
+            <version>${avro-version}</version>
+        </dependency>
 
         <!-- testing -->
     </dependencies>
diff --git a/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/AbstractSalesforceExecution.java b/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/AbstractSalesforceExecution.java
index e713daca582..298b11e8ce2 100644
--- a/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/AbstractSalesforceExecution.java
+++ b/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/AbstractSalesforceExecution.java
@@ -30,6 +30,7 @@ import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.api.utils.SecurityUtils;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.component.salesforce.internal.client.DefaultRestClient;
+import org.apache.camel.component.salesforce.internal.client.PubSubApiClient;
 import org.apache.camel.component.salesforce.internal.client.RestClient;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.support.PropertyBindingSupport;
@@ -151,12 +152,18 @@ public abstract class AbstractSalesforceExecution {
 
     private long responseTimeout;
 
+    private SalesforceHttpClient httpClient;
+    private SalesforceSession session;
+    private RestClient restClient;
+    private PubSubApiClient pubSubApiClient;
+    private String pubSubHost;
+    private int pubSubPort;
+
     public final void execute() throws Exception {
         setup();
-
-        final RestClient restClient = connectToSalesforce();
+        login();
         try {
-            executeWithClient(restClient);
+            executeWithClient();
         } finally {
             disconnectFromSalesforce(restClient);
         }
@@ -166,24 +173,34 @@ public abstract class AbstractSalesforceExecution {
         return responseTimeout;
     }
 
-    private RestClient connectToSalesforce() throws Exception {
-        RestClient restClient = null;
+    private void login() {
         try {
-            final SalesforceHttpClient httpClient = createHttpClient();
+            httpClient = createHttpClient();
 
             // connect to Salesforce
             getLog().info("Logging in to Salesforce");
-            final SalesforceSession session = httpClient.getSession();
+            session = httpClient.getSession();
             try {
                 session.login(null);
+
             } catch (final SalesforceException e) {
                 final String msg = "Salesforce login error " + e.getMessage();
                 throw new RuntimeException(msg, e);
             }
             getLog().info("Salesforce login successful");
+        } catch (final Exception e) {
+            final String msg = "Error connecting to Salesforce: " + e.getMessage();
+            ServiceHelper.stopAndShutdownServices(session, httpClient);
+            throw new RuntimeException(msg, e);
+        }
+    }
 
-            // create rest client
-
+    protected RestClient getRestClient() {
+        if (restClient != null) {
+            return restClient;
+        }
+        try {
+            login();
             restClient = new DefaultRestClient(httpClient, version, session, new SalesforceLoginConfig());
             // remember to start the active client object
             ((DefaultRestClient) restClient).start();
@@ -196,6 +213,15 @@ public abstract class AbstractSalesforceExecution {
         }
     }
 
+    protected PubSubApiClient getPubSubApiClient() {
+        if (pubSubApiClient != null) {
+            return pubSubApiClient;
+        }
+        pubSubApiClient = new PubSubApiClient(session, new SalesforceLoginConfig(), pubSubHost, pubSubPort, 0, 0);
+        pubSubApiClient.start();
+        return pubSubApiClient;
+    }
+
     private SalesforceHttpClient createHttpClient() throws Exception {
         final SalesforceHttpClient httpClient;
 
@@ -371,7 +397,15 @@ public abstract class AbstractSalesforceExecution {
         this.version = version;
     }
 
-    protected abstract void executeWithClient(RestClient client) throws Exception;
+    public void setPubSubHost(String pubSubHost) {
+        this.pubSubHost = pubSubHost;
+    }
+
+    public void setPubSubPort(int pubSubPort) {
+        this.pubSubPort = pubSubPort;
+    }
+
+    protected abstract void executeWithClient() throws Exception;
 
     protected abstract Logger getLog();
 
diff --git a/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/GenerateExecution.java b/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/GenerateExecution.java
index 67dcc6091a9..811c1589933 100644
--- a/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/GenerateExecution.java
+++ b/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/GenerateExecution.java
@@ -47,7 +47,6 @@ import org.apache.camel.component.salesforce.api.dto.AbstractSObjectBase;
 import org.apache.camel.component.salesforce.api.dto.PickListValue;
 import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
 import org.apache.camel.component.salesforce.api.dto.SObjectField;
-import org.apache.camel.component.salesforce.internal.client.RestClient;
 import org.apache.camel.impl.engine.DefaultBeanIntrospection;
 import org.apache.camel.spi.BeanIntrospection;
 import org.apache.camel.util.StringHelper;
@@ -482,9 +481,9 @@ public class GenerateExecution extends AbstractSalesforceExecution {
     }
 
     @Override
-    protected void executeWithClient(final RestClient client) throws Exception {
+    protected void executeWithClient() throws Exception {
         descriptions = new ObjectDescriptions(
-                client, getResponseTimeout(), includes, includePattern, excludes, excludePattern, getLog());
+                getRestClient(), getResponseTimeout(), includes, includePattern, excludes, excludePattern, getLog());
 
         // make sure we can load both templates
         if (!engine.resourceExists(SOBJECT_POJO_VM) || !engine.resourceExists(SOBJECT_QUERY_RECORDS_VM)
diff --git a/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/GeneratePubSubExecution.java b/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/GeneratePubSubExecution.java
new file mode 100644
index 00000000000..a9a5ac07b84
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/GeneratePubSubExecution.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.codegen;
+
+import java.io.File;
+import java.nio.file.Files;
+
+import com.salesforce.eventbus.protobuf.TopicInfo;
+import org.apache.avro.compiler.specific.SpecificCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GeneratePubSubExecution extends AbstractSalesforceExecution {
+
+    private String[] topics;
+    private File outputDirectory;
+
+    @Override
+    protected void executeWithClient() throws Exception {
+        for (String topicName : topics) {
+            final TopicInfo topicInfo = getPubSubApiClient().getTopicInfo(topicName);
+            final String schemaJson = getPubSubApiClient().getSchemaJson(topicInfo.getSchemaId());
+            final File schemaFile = File.createTempFile("schema", ".json", outputDirectory);
+            Files.writeString(schemaFile.toPath(), schemaJson);
+            SpecificCompiler.compileSchema(schemaFile, outputDirectory);
+        }
+    }
+
+    public void setTopics(String[] topics) {
+        this.topics = topics;
+    }
+
+    public void setOutputDirectory(File outputDirectory) {
+        this.outputDirectory = outputDirectory;
+    }
+
+    @Override
+    protected Logger getLog() {
+        return LoggerFactory.getLogger(GeneratePubSubExecution.class);
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/SchemaExecution.java b/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/SchemaExecution.java
index 18e681e5e65..7235bdd1599 100644
--- a/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/SchemaExecution.java
+++ b/components/camel-salesforce/camel-salesforce-codegen/src/main/java/org/apache/camel/component/salesforce/codegen/SchemaExecution.java
@@ -29,7 +29,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
 import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
 import org.apache.camel.component.salesforce.api.utils.JsonUtils;
-import org.apache.camel.component.salesforce.internal.client.RestClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,11 +50,11 @@ public class SchemaExecution extends AbstractSalesforceExecution {
     File outputDirectory;
 
     @Override
-    protected void executeWithClient(RestClient client) throws Exception {
+    protected void executeWithClient() throws Exception {
         getLog().info("Generating JSON Schema...");
 
         final ObjectDescriptions descriptions = new ObjectDescriptions(
-                client, getResponseTimeout(), includes, includePattern, excludes, excludePattern, getLog());
+                getRestClient(), getResponseTimeout(), includes, includePattern, excludes, excludePattern, getLog());
 
         // generate JSON schema for every object description
         final ObjectMapper schemaObjectMapper = JsonUtils.createSchemaObjectMapper();
diff --git a/components/camel-salesforce/camel-salesforce-component/pom.xml b/components/camel-salesforce/camel-salesforce-component/pom.xml
index 44fdec774fd..a7430d79d49 100644
--- a/components/camel-salesforce/camel-salesforce-component/pom.xml
+++ b/components/camel-salesforce/camel-salesforce-component/pom.xml
@@ -134,6 +134,45 @@
             <version>${jaxb-impl-version}</version>
         </dependency>
         
+        <dependency>
+            <groupId>javax.annotation</groupId>
+            <artifactId>javax.annotation-api</artifactId>
+            <version>1.3.2</version>
+        </dependency>
+
+        <!-- Pub/Sub API dependencies -->
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-netty</artifactId>
+            <version>${grpc-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-protobuf</artifactId>
+            <version>${grpc-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-stub</artifactId>
+            <version>${grpc-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>tech.allegro.schema.json2avro</groupId>
+            <artifactId>converter</artifactId>
+            <version>0.2.15</version>
+        </dependency>
+
+        <!-- logging -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
         <!-- testing -->
         <dependency>
             <groupId>org.apache.commons</groupId>
@@ -225,6 +264,13 @@
     </dependencies>
 
     <build>
+        <extensions>
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>1.7.1</version>
+            </extension>
+        </extensions>
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
@@ -248,6 +294,24 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>0.6.1</version>
+                <configuration>
+                    <protocArtifact>com.google.protobuf:protoc:3.21.12:exe:${os.detected.classifier}</protocArtifact>
+                    <pluginId>grpc-java</pluginId>
+                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.51.1:exe:${os.detected.classifier}</pluginArtifact>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>compile-custom</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 
@@ -298,6 +362,7 @@ https://developer.salesforce.com/page/Force.com_Migration_Tool]]></message>
                             </execution>
                         </executions>
                     </plugin>
+
                     <plugin>
                         <artifactId>maven-resources-plugin</artifactId>
                         <version>${maven-resources-plugin-version}</version>
diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceComponentConfigurer.java b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceComponentConfigurer.java
index 3db266e3b68..41fe36cac2a 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceComponentConfigurer.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceComponentConfigurer.java
@@ -153,6 +153,16 @@ public class SalesforceComponentConfigurer extends PropertyConfigurerSupport imp
         case "pkChunkingParent": getOrCreateConfig(target).setPkChunkingParent(property(camelContext, java.lang.String.class, value)); return true;
         case "pkchunkingstartrow":
         case "pkChunkingStartRow": getOrCreateConfig(target).setPkChunkingStartRow(property(camelContext, java.lang.String.class, value)); return true;
+        case "pubsubbatchsize":
+        case "pubSubBatchSize": getOrCreateConfig(target).setPubSubBatchSize(property(camelContext, int.class, value)); return true;
+        case "pubsubdeserializetype":
+        case "pubSubDeserializeType": getOrCreateConfig(target).setPubSubDeserializeType(property(camelContext, org.apache.camel.component.salesforce.PubSubDeserializeType.class, value)); return true;
+        case "pubsubhost":
+        case "pubSubHost": target.setPubSubHost(property(camelContext, java.lang.String.class, value)); return true;
+        case "pubsubpojoclass":
+        case "pubSubPojoClass": getOrCreateConfig(target).setPubSubPojoClass(property(camelContext, java.lang.String.class, value)); return true;
+        case "pubsubport":
+        case "pubSubPort": target.setPubSubPort(property(camelContext, int.class, value)); return true;
         case "querylocator":
         case "queryLocator": getOrCreateConfig(target).setQueryLocator(property(camelContext, java.lang.String.class, value)); return true;
         case "rawhttpheaders":
@@ -167,6 +177,8 @@ public class SalesforceComponentConfigurer extends PropertyConfigurerSupport imp
         case "rawQueryParameters": getOrCreateConfig(target).setRawQueryParameters(property(camelContext, java.lang.String.class, value)); return true;
         case "refreshtoken":
         case "refreshToken": target.setRefreshToken(property(camelContext, java.lang.String.class, value)); return true;
+        case "replaypreset":
+        case "replayPreset": getOrCreateConfig(target).setReplayPreset(property(camelContext, com.salesforce.eventbus.protobuf.ReplayPreset.class, value)); return true;
         case "reportid":
         case "reportId": getOrCreateConfig(target).setReportId(property(camelContext, java.lang.String.class, value)); return true;
         case "reportmetadata":
@@ -337,6 +349,16 @@ public class SalesforceComponentConfigurer extends PropertyConfigurerSupport imp
         case "pkChunkingParent": return java.lang.String.class;
         case "pkchunkingstartrow":
         case "pkChunkingStartRow": return java.lang.String.class;
+        case "pubsubbatchsize":
+        case "pubSubBatchSize": return int.class;
+        case "pubsubdeserializetype":
+        case "pubSubDeserializeType": return org.apache.camel.component.salesforce.PubSubDeserializeType.class;
+        case "pubsubhost":
+        case "pubSubHost": return java.lang.String.class;
+        case "pubsubpojoclass":
+        case "pubSubPojoClass": return java.lang.String.class;
+        case "pubsubport":
+        case "pubSubPort": return int.class;
         case "querylocator":
         case "queryLocator": return java.lang.String.class;
         case "rawhttpheaders":
@@ -351,6 +373,8 @@ public class SalesforceComponentConfigurer extends PropertyConfigurerSupport imp
         case "rawQueryParameters": return java.lang.String.class;
         case "refreshtoken":
         case "refreshToken": return java.lang.String.class;
+        case "replaypreset":
+        case "replayPreset": return com.salesforce.eventbus.protobuf.ReplayPreset.class;
         case "reportid":
         case "reportId": return java.lang.String.class;
         case "reportmetadata":
@@ -522,6 +546,16 @@ public class SalesforceComponentConfigurer extends PropertyConfigurerSupport imp
         case "pkChunkingParent": return getOrCreateConfig(target).getPkChunkingParent();
         case "pkchunkingstartrow":
         case "pkChunkingStartRow": return getOrCreateConfig(target).getPkChunkingStartRow();
+        case "pubsubbatchsize":
+        case "pubSubBatchSize": return getOrCreateConfig(target).getPubSubBatchSize();
+        case "pubsubdeserializetype":
+        case "pubSubDeserializeType": return getOrCreateConfig(target).getPubSubDeserializeType();
+        case "pubsubhost":
+        case "pubSubHost": return target.getPubSubHost();
+        case "pubsubpojoclass":
+        case "pubSubPojoClass": return getOrCreateConfig(target).getPubSubPojoClass();
+        case "pubsubport":
+        case "pubSubPort": return target.getPubSubPort();
         case "querylocator":
         case "queryLocator": return getOrCreateConfig(target).getQueryLocator();
         case "rawhttpheaders":
@@ -536,6 +570,8 @@ public class SalesforceComponentConfigurer extends PropertyConfigurerSupport imp
         case "rawQueryParameters": return getOrCreateConfig(target).getRawQueryParameters();
         case "refreshtoken":
         case "refreshToken": return target.getRefreshToken();
+        case "replaypreset":
+        case "replayPreset": return getOrCreateConfig(target).getReplayPreset();
         case "reportid":
         case "reportId": return getOrCreateConfig(target).getReportId();
         case "reportmetadata":
diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
index 15675a520d6..e58c53ad0d8 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java
@@ -92,6 +92,14 @@ public class SalesforceEndpointConfigurer extends PropertyConfigurerSupport impl
         case "pkChunkingParent": target.getConfiguration().setPkChunkingParent(property(camelContext, java.lang.String.class, value)); return true;
         case "pkchunkingstartrow":
         case "pkChunkingStartRow": target.getConfiguration().setPkChunkingStartRow(property(camelContext, java.lang.String.class, value)); return true;
+        case "pubsubbatchsize":
+        case "pubSubBatchSize": target.getConfiguration().setPubSubBatchSize(property(camelContext, int.class, value)); return true;
+        case "pubsubdeserializetype":
+        case "pubSubDeserializeType": target.getConfiguration().setPubSubDeserializeType(property(camelContext, org.apache.camel.component.salesforce.PubSubDeserializeType.class, value)); return true;
+        case "pubsubpojoclass":
+        case "pubSubPojoClass": target.getConfiguration().setPubSubPojoClass(property(camelContext, java.lang.String.class, value)); return true;
+        case "pubsubreplayid":
+        case "pubSubReplayId": target.setPubSubReplayId(property(camelContext, java.lang.String.class, value)); return true;
         case "querylocator":
         case "queryLocator": target.getConfiguration().setQueryLocator(property(camelContext, java.lang.String.class, value)); return true;
         case "rawhttpheaders":
@@ -106,6 +114,8 @@ public class SalesforceEndpointConfigurer extends PropertyConfigurerSupport impl
         case "rawQueryParameters": target.getConfiguration().setRawQueryParameters(property(camelContext, java.lang.String.class, value)); return true;
         case "replayid":
         case "replayId": target.setReplayId(property(camelContext, java.lang.Long.class, value)); return true;
+        case "replaypreset":
+        case "replayPreset": target.getConfiguration().setReplayPreset(property(camelContext, com.salesforce.eventbus.protobuf.ReplayPreset.class, value)); return true;
         case "reportid":
         case "reportId": target.getConfiguration().setReportId(property(camelContext, java.lang.String.class, value)); return true;
         case "reportmetadata":
@@ -212,6 +222,14 @@ public class SalesforceEndpointConfigurer extends PropertyConfigurerSupport impl
         case "pkChunkingParent": return java.lang.String.class;
         case "pkchunkingstartrow":
         case "pkChunkingStartRow": return java.lang.String.class;
+        case "pubsubbatchsize":
+        case "pubSubBatchSize": return int.class;
+        case "pubsubdeserializetype":
+        case "pubSubDeserializeType": return org.apache.camel.component.salesforce.PubSubDeserializeType.class;
+        case "pubsubpojoclass":
+        case "pubSubPojoClass": return java.lang.String.class;
+        case "pubsubreplayid":
+        case "pubSubReplayId": return java.lang.String.class;
         case "querylocator":
         case "queryLocator": return java.lang.String.class;
         case "rawhttpheaders":
@@ -226,6 +244,8 @@ public class SalesforceEndpointConfigurer extends PropertyConfigurerSupport impl
         case "rawQueryParameters": return java.lang.String.class;
         case "replayid":
         case "replayId": return java.lang.Long.class;
+        case "replaypreset":
+        case "replayPreset": return com.salesforce.eventbus.protobuf.ReplayPreset.class;
         case "reportid":
         case "reportId": return java.lang.String.class;
         case "reportmetadata":
@@ -333,6 +353,14 @@ public class SalesforceEndpointConfigurer extends PropertyConfigurerSupport impl
         case "pkChunkingParent": return target.getConfiguration().getPkChunkingParent();
         case "pkchunkingstartrow":
         case "pkChunkingStartRow": return target.getConfiguration().getPkChunkingStartRow();
+        case "pubsubbatchsize":
+        case "pubSubBatchSize": return target.getConfiguration().getPubSubBatchSize();
+        case "pubsubdeserializetype":
+        case "pubSubDeserializeType": return target.getConfiguration().getPubSubDeserializeType();
+        case "pubsubpojoclass":
+        case "pubSubPojoClass": return target.getConfiguration().getPubSubPojoClass();
+        case "pubsubreplayid":
+        case "pubSubReplayId": return target.getPubSubReplayId();
         case "querylocator":
         case "queryLocator": return target.getConfiguration().getQueryLocator();
         case "rawhttpheaders":
@@ -347,6 +375,8 @@ public class SalesforceEndpointConfigurer extends PropertyConfigurerSupport impl
         case "rawQueryParameters": return target.getConfiguration().getRawQueryParameters();
         case "replayid":
         case "replayId": return target.getReplayId();
+        case "replaypreset":
+        case "replayPreset": return target.getConfiguration().getReplayPreset();
         case "reportid":
         case "reportId": return target.getConfiguration().getReportId();
         case "reportmetadata":
diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
index 4d10a0fe5d2..c9c22e45cd6 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class SalesforceEndpointUriFactory extends org.apache.camel.support.compo
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(60);
+        Set<String> props = new HashSet<>(65);
         props.add("allOrNone");
         props.add("apexMethod");
         props.add("apexQueryParams");
@@ -60,6 +60,10 @@ public class SalesforceEndpointUriFactory extends org.apache.camel.support.compo
         props.add("pkChunkingChunkSize");
         props.add("pkChunkingParent");
         props.add("pkChunkingStartRow");
+        props.add("pubSubBatchSize");
+        props.add("pubSubDeserializeType");
+        props.add("pubSubPojoClass");
+        props.add("pubSubReplayId");
         props.add("queryLocator");
         props.add("rawHttpHeaders");
         props.add("rawMethod");
@@ -67,6 +71,7 @@ public class SalesforceEndpointUriFactory extends org.apache.camel.support.compo
         props.add("rawPayload");
         props.add("rawQueryParameters");
         props.add("replayId");
+        props.add("replayPreset");
         props.add("reportId");
         props.add("reportMetadata");
         props.add("resultId");
diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json
index 2a9bb649827..3f77024c43b 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json
+++ b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/org/apache/camel/component/salesforce/salesforce.json
@@ -81,6 +81,10 @@
     "workerPoolMaxSize": { "kind": "property", "displayName": "Worker Pool Max Size", "group": "common (advanced)", "label": "common,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20, "description": "Maximum size of the thread pool used to handle HTTP responses." },
     "workerPoolSize": { "kind": "property", "displayName": "Worker Pool Size", "group": "common (advanced)", "label": "common,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "description": "Size of the thread pool used to handle HTTP responses." },
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...]
+    "pubSubBatchSize": { "kind": "property", "displayName": "Pub Sub Batch Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 100, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "config", "description": "Max number of events to receive in a batch from the Pub\/Sub API." },
+    "pubSubDeserializeType": { "kind": "property", "displayName": "Pub Sub Deserialize Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.salesforce.PubSubDeserializeType", "enum": [ "AVRO", "SPECIFIC_RECORD", "GENERIC_RECORD", "POJO", "JSON" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "AVRO", "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", " [...]
+    "pubSubPojoClass": { "kind": "property", "displayName": "Pub Sub Pojo Class", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "config", "description": "Fully qualified class name to deserialize Pub\/Sub API event to." },
+    "replayPreset": { "kind": "property", "displayName": "Replay Preset", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "com.salesforce.eventbus.protobuf.ReplayPreset", "enum": [ "LATEST", "EARLIEST", "CUSTOM" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "LATEST", "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "config", "description": "Replay preset  [...]
     "allOrNone": { "kind": "property", "displayName": "All Or None", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "config", "description": "Composite API option to indicate to rollback all records if any are not successful." },
     "apexUrl": { "kind": "property", "displayName": "Apex Url", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "config", "description": "APEX method URL" },
     "compositeMethod": { "kind": "property", "displayName": "Composite Method", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "config", "description": "Composite (raw) method." },
@@ -111,13 +115,16 @@
     "loginConfig": { "kind": "property", "displayName": "Login Config", "group": "security", "label": "common,security", "required": false, "type": "object", "javaType": "org.apache.camel.component.salesforce.SalesforceLoginConfig", "deprecated": false, "autowired": false, "secret": false, "description": "All authentication configuration in one nested bean, all properties set there can be set directly on the component as well" },
     "loginUrl": { "kind": "property", "displayName": "Login Url", "group": "security", "label": "common,security", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "https:\/\/login.salesforce.com", "description": "URL of the Salesforce instance used for authentication, by default set to https:\/\/login.salesforce.com" },
     "password": { "kind": "property", "displayName": "Password", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password used in OAuth flow to gain access to access token. It's easy to get started with password OAuth flow, but in general one should avoid it as it is deemed less secure than other flows. Make sure that you append security token to  [...]
+    "pubSubHost": { "kind": "property", "displayName": "Pub Sub Host", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "api.pubsub.salesforce.com", "description": "Pub\/Sub host" },
+    "pubSubPort": { "kind": "property", "displayName": "Pub Sub Port", "group": "security", "label": "common,security", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 7443, "description": "Pub\/Sub port" },
     "refreshToken": { "kind": "property", "displayName": "Refresh Token", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Refresh token already obtained in the refresh token OAuth flow. One needs to setup a web application and configure a callback URL to receive the refresh token, or configure using the builtin callback at https:\/\/login.salesfor [...]
     "sslContextParameters": { "kind": "property", "displayName": "Ssl Context Parameters", "group": "security", "label": "common,security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "description": "SSL parameters to use, see SSLContextParameters class for all available options." },
     "useGlobalSslContextParameters": { "kind": "property", "displayName": "Use Global Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enable usage of global SSL context parameters" },
     "userName": { "kind": "property", "displayName": "User Name", "group": "security", "label": "common,security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Username used in OAuth flow to gain access to access token. It's easy to get started with password OAuth flow, but in general one should avoid it as it is deemed less secure than other flows." }
   },
   "headers": {
-    "CamelSalesforceReplayId": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The replay id.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_REPLAY_ID" },
+    "CamelSalesforceReplayId": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Streaming API replayId.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_REPLAY_ID" },
+    "CamelSalesforcePubSubReplayId": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Pub\/Sub API replayId.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_PUBSUB_REPLAY_ID" },
     "CamelSalesforceChangeEventSchema": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The change event schema.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANGE_EVENT_SCHEMA" },
     "CamelSalesforceEventType": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The event type.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_EVENT_TYPE" },
     "CamelSalesforceCommitTimestamp": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The commit timestamp.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_COMMIT_TIMESTAMP" },
@@ -138,8 +145,8 @@
     "CamelSalesforceQueryResultTotalSize": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Total number of records matching a query.", "constantName": "org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_QUERY_RESULT_TOTAL_SIZE" }
   },
   "properties": {
-    "operationName": { "kind": "path", "displayName": "Operation Name", "group": "common", "label": "common", "required": true, "type": "object", "javaType": "org.apache.camel.component.salesforce.internal.OperationName", "enum": [ "getVersions", "getResources", "getGlobalObjects", "getBasicInfo", "getDescription", "getSObject", "createSObject", "updateSObject", "deleteSObject", "getSObjectWithId", "upsertSObject", "deleteSObjectWithId", "getBlobField", "query", "queryMore", "queryAll",  [...]
-    "topicName": { "kind": "path", "displayName": "Topic Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The name of the topic\/channel to use" },
+    "operationName": { "kind": "path", "displayName": "Operation Name", "group": "common", "label": "common", "required": true, "type": "object", "javaType": "org.apache.camel.component.salesforce.internal.OperationName", "enum": [ "getVersions", "getResources", "getGlobalObjects", "getBasicInfo", "getDescription", "getSObject", "createSObject", "updateSObject", "deleteSObject", "getSObjectWithId", "upsertSObject", "deleteSObjectWithId", "getBlobField", "query", "queryMore", "queryAll",  [...]
+    "topicName": { "kind": "path", "displayName": "Topic Name", "group": "producer", "label": "consumer,producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The name of the topic\/channel to use" },
     "apexMethod": { "kind": "parameter", "displayName": "Apex Method", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "APEX method name" },
     "apexQueryParams": { "kind": "parameter", "displayName": "Apex Query Params", "group": "common", "label": "", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Query params for APEX method" },
     "apiVersion": { "kind": "parameter", "displayName": "Api Version", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "56.0", "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Salesforce API version." },
@@ -186,7 +193,12 @@
     "sObjectSearch": { "kind": "parameter", "displayName": "SObject Search", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Salesforce SOSL search string" },
     "streamQueryResult": { "kind": "parameter", "displayName": "Stream query result", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "false", "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "If true, streams SOQL query result and transparently handles subsequent reque [...]
     "updateTopic": { "kind": "parameter", "displayName": "Update Topic", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Whether to update an existing Push Topic when using the Streaming API, defaults to false" },
-    "replayId": { "kind": "parameter", "displayName": "Replay Id", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "description": "The replayId value to use when subscribing" },
+    "pubSubBatchSize": { "kind": "parameter", "displayName": "Pub Sub Batch Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 100, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Max number of events to receive in a batch from the Pub\/Sub API." },
+    "pubSubDeserializeType": { "kind": "parameter", "displayName": "Pub Sub Deserialize Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.salesforce.PubSubDeserializeType", "enum": [ "AVRO", "SPECIFIC_RECORD", "GENERIC_RECORD", "POJO", "JSON" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "AVRO", "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig",  [...]
+    "pubSubPojoClass": { "kind": "parameter", "displayName": "Pub Sub Pojo Class", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Fully qualified class name to deserialize Pub\/Sub API event to." },
+    "pubSubReplayId": { "kind": "parameter", "displayName": "Pub Sub Replay Id", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "The replayId value to use when subscribing to the Pub\/Sub API." },
+    "replayId": { "kind": "parameter", "displayName": "Replay Id", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "description": "The replayId value to use when subscribing to the Streaming API." },
+    "replayPreset": { "kind": "parameter", "displayName": "Replay Preset", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "com.salesforce.eventbus.protobuf.ReplayPreset", "enum": [ "LATEST", "EARLIEST", "CUSTOM" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "LATEST", "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Replay [...]
     "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now [...]
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
index b6576d87d3a..c694beabd69 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc
@@ -183,6 +183,7 @@ Camel supports the following Salesforce APIs:
 * <<ApexRESTAPI,Apex REST API>>
 * <<Bulk2API,Bulk 2 API>>
 * <<BulkAPI,Bulk API>>
+* <<PubSubAPI,Pub/Sub API>>
 * <<StreamingAPI,Streaming API>>
 * <<ReportsAPI,Reports API>>
 
@@ -1615,6 +1616,92 @@ error.
 ...to("salesforce:createBatch")..
 ----
 
+[[PubSubAPI]]
+=== Pub/Sub API
+
+The Pub/Sub API allows you to publish and subscribe to platform events, including real-time event
+monitoring events, and change data capture events. This API is based on gRPC and HTTP/2, and event
+payloads are delivered in Apache Avro format.
+
+==== Publishing Events
+
+The URI format for publishing events is:
+----
+salesforce:pubSubPublish:<topic_name>
+----
+
+For example:
+----
+.to("salesforce:pubsubPublish:/event/MyCustomPlatformEvent__e")
+----
+
+[[pubSubPublish]]
+==== Publish an Event
+
+`pubSubPublish`
+
+|===
+| Parameter | Type | Description| Default| Required
+
+| Body | `List`. List can contained mixed types (see description below). | Event payloads to be published | |
+
+|===
+
+Because The Pub/Sub API requires that event payloads be serialized in Apache Avro format, Camel will
+attempt to serialize event payloads from the following input types:
+
+* Avro `GenericRecord`. Camel fetches the Avro schema in order to serialize `GenericRecord`
+instances. This option doesn't require ahead-of-time generation of Event classes.
+* Avro `SpecificRecord`. Subclasses of `SpecificRecord` contain properties that are specific to an
+event type. The <<MavenPlugin,maven plugin>> can generate the subclasses automatically.
+* POJO. Camel fetches the Avro schema in order to serialize POJO instances. The POJO's
+field names must match event field names exactly, including case.
+* `String`. Camel will treat the `String` value as JSON and serialize to Avro. Note that the JSON
+value does not have to be Avro-encoded JSON. It can be arbitrary JSON, but it must be serializable
+to Avro based on the Schema associated with the topic you're publishing to. The JSON object's
+field names must match event field names exactly, including case.
+* `byte[]`. Camel will not perform any serialization. Value must be the Avro-encoded event payload.
+* `com.salesforce.eventbus.protobuf.ProducerEvent`. Providing a `ProducerEvent` allows full control,
+e.g. setting the `id` property, which can be tied back to the `PublishResult.CorrelationKey`.
+
+*Output*
+
+Type: `List<org.apache.camel.component.salesforce.api.dto.pubsub.PublishResult>`
+
+The order of the items in the returned `List` correlates to the order of the items in the input `List`.
+
+==== Subscribing
+The URI format for subscribing to a Pub/Sub topic is:
+----
+salesforce:pubSubSubscribe:<topic_name>
+----
+
+For example:
+
+----
+from("salesforce:pubSubSubscribe:/event/BatchApexErrorEvent")
+----
+
+|===
+| Parameter | Type | Description| Default| Required
+
+| `replayPreset` | `ReplayPreset` | Values: `LATEST`, `EARLIEST`, `CUSTOM`.  | `LATEST` |
+| `pubSubReplayId` | `String` | When `replayPreset` is set to `CUSTOM`, the replayId to use when
+subscribing to a topic.  | |
+| `pubSubBatchSize` | int | Max number of events to receive at a time. Values >100 will be normalized to 100 by salesforce.  | 100 | X
+| `pubSubDeserializeType` | `PubSubDeserializeType` | Values: `AVRO`, `SPECIFIC_RECORD`, `GENERIC_RECORD`,
+`POJO`, `JSON`. `AVRO` will try a `SpecificRecord` subclass if found, otherwise `GenericRecord` | `AVRO` | X
+| `pubSubPojoClass` | Fully qualified class name to deserialize Pub/Sub API event to. | | | If `pubSubDeserializeType` is `POJO`
+
+|===
+
+*Output*
+
+Type: Determined by the `pubSubDeserializeType` option.
+
+Headers: `CamelSalesforcePubSubReplayId`
+
+
 [[StreamingAPI]]
 === Streaming API
 
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
new file mode 100644
index 00000000000..2b7589bbf7d
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.salesforce.eventbus.protobuf.ReplayPreset;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.salesforce.internal.client.PubSubApiClient;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.service.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.salesforce.SalesforceConstants.HEADER_SALESFORCE_PUBSUB_REPLAY_ID;
+
+public class PubSubApiConsumer extends DefaultConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PubSubApiConsumer.class);
+    private final String topic;
+    private final ReplayPreset initialReplayPreset;
+    private final String initialReplayId;
+    private final SalesforceEndpoint endpoint;
+
+    private final int batchSize;
+    private final PubSubDeserializeType deserializeType;
+    private Class<?> pojoClass;
+    private PubSubApiClient pubSubClient;
+    private Map<String, Class<?>> eventClassMap;
+
+    public PubSubApiConsumer(SalesforceEndpoint endpoint, Processor processor) throws ClassNotFoundException {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+        this.topic = endpoint.getTopicName();
+        this.initialReplayPreset = endpoint.getConfiguration().getReplayPreset();
+        this.initialReplayId = endpoint.getPubSubReplayId();
+        if (initialReplayPreset == ReplayPreset.CUSTOM && initialReplayId == null) {
+            throw new IllegalArgumentException("pubSubReplayId option is required if ReplayPreset is CUSTOM.");
+        }
+        this.batchSize = endpoint.getConfiguration().getPubSubBatchSize();
+        this.deserializeType = endpoint.getConfiguration().getPubSubDeserializeType();
+        String pojoClassName = endpoint.getConfiguration().getPubSubPojoClass();
+        if (pojoClassName != null) {
+            this.pojoClass = this.getClass().getClassLoader().loadClass(pojoClassName);
+        }
+    }
+
+    public void processEvent(Object record, String replayId) throws IOException {
+        final Exchange exchange = createExchange(true);
+        final Message in = exchange.getIn();
+        in.setBody(record);
+        in.setHeader(HEADER_SALESFORCE_PUBSUB_REPLAY_ID, replayId);
+        AsyncCallback cb = defaultConsumerCallback(exchange, true);
+        getAsyncProcessor().process(exchange, cb);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        this.eventClassMap = endpoint.getComponent().getEventClassMap();
+        this.pubSubClient = new PubSubApiClient(
+                endpoint.getComponent().getSession(), endpoint.getComponent().getLoginConfig(),
+                endpoint.getComponent().getPubSubHost(), endpoint.getComponent().getPubSubPort(),
+                endpoint.getConfiguration().getBackoffIncrement(), endpoint.getConfiguration().getMaxBackoff());
+
+        ServiceHelper.startService(pubSubClient);
+        pubSubClient.subscribe(this, initialReplayPreset, initialReplayId);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(pubSubClient);
+        super.doStop();
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public PubSubDeserializeType getDeserializeType() {
+        return deserializeType;
+    }
+
+    public Map<String, Class<?>> getEventClassMap() {
+        return eventClassMap;
+    }
+
+    public Class<?> getPojoClass() {
+        return pojoClass;
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubDeserializeType.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubDeserializeType.java
new file mode 100644
index 00000000000..1e3cff5cd33
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubDeserializeType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+public enum PubSubDeserializeType {
+    AVRO,
+    SPECIFIC_RECORD,
+    GENERIC_RECORD,
+    POJO,
+    JSON
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
index 4b68621eecd..4fe31920237 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 
+import org.apache.avro.specific.SpecificRecord;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.SSLContextParametersAware;
@@ -93,6 +94,7 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP
     static final String APEX_CALL_PREFIX = OperationName.APEX_CALL.value() + "/";
 
     private static final Logger LOG = LoggerFactory.getLogger(SalesforceComponent.class);
+    private static final String SALESFORCE_EVENTBUS_PACKAGE = "com.sforce.eventbus";
 
     @Metadata(description = "All authentication configuration in one nested bean, all properties set there can be set"
                             + " directly on the component as well",
@@ -162,6 +164,14 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP
               defaultValue = "false", label = "common,security")
     private boolean lazyLogin;
 
+    @Metadata(description = "Pub/Sub host",
+              defaultValue = "api.pubsub.salesforce.com", label = "common,security")
+    private String pubSubHost = "api.pubsub.salesforce.com";
+
+    @Metadata(description = "Pub/Sub port",
+              defaultValue = "7443", label = "common,security")
+    private int pubSubPort = 7443;
+
     @Metadata(description = "Global endpoint configuration - use to set values that are common to all endpoints",
               label = "common,advanced")
     private SalesforceEndpointConfig config;
@@ -266,6 +276,7 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP
     private SalesforceSession session;
 
     private Map<String, Class<?>> classMap;
+    private Map<String, Class<?>> eventClassMap;
 
     // Lazily created helper for consumer endpoints
     private SubscriptionHelper subscriptionHelper;
@@ -291,10 +302,10 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP
             // extract APEX URL
             apexUrl = remaining.substring(APEX_CALL_PREFIX.length());
             remaining = OperationName.APEX_CALL.value();
-        } else if (remaining.startsWith(OperationName.SUBSCRIBE.value())) {
+        } else if (remaining.startsWith(OperationName.SUBSCRIBE.value()) || remaining.startsWith("pubSub")) {
             final String[] parts = remaining.split(":");
             if (parts.length != 2) {
-                throw new IllegalArgumentException("topicName must be supplied for subscribe operation.");
+                throw new IllegalArgumentException("topicName must be supplied for subscribe/pubsub operations.");
             }
             remaining = parts[0];
             topicName = parts[1];
@@ -348,6 +359,17 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP
         return result;
     }
 
+    private Map<String, Class<?>> scanEventClasses() {
+        Map<String, Class<?>> result = new HashMap<>();
+
+        Set<Class<?>> classes = getCamelContext().getCamelContextExtension().getPackageScanClassResolver()
+                .findImplementations(SpecificRecord.class, SALESFORCE_EVENTBUS_PACKAGE);
+        for (Class<?> aClass : classes) {
+            result.put(aClass.getName(), aClass);
+        }
+        return result;
+    }
+
     public SalesforceHttpClient getHttpClient() {
         return httpClient;
     }
@@ -432,6 +454,8 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP
             classMap = new HashMap<>(0);
         }
 
+        this.eventClassMap = scanEventClasses();
+
         if (subscriptionHelper != null) {
             ServiceHelper.startService(subscriptionHelper);
         }
@@ -442,6 +466,9 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP
         if (classMap != null) {
             classMap.clear();
         }
+        if (eventClassMap != null) {
+            eventClassMap.clear();
+        }
 
         try {
             if (subscriptionHelper != null) {
@@ -572,6 +599,22 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP
         this.lazyLogin = lazyLogin;
     }
 
+    public String getPubSubHost() {
+        return pubSubHost;
+    }
+
+    public void setPubSubHost(String pubSubHost) {
+        this.pubSubHost = pubSubHost;
+    }
+
+    public int getPubSubPort() {
+        return pubSubPort;
+    }
+
+    public void setPubSubPort(int pubSubPort) {
+        this.pubSubPort = pubSubPort;
+    }
+
     public SalesforceEndpointConfig getConfig() {
         return config;
     }
@@ -782,6 +825,10 @@ public class SalesforceComponent extends DefaultComponent implements SSLContextP
         return classMap;
     }
 
+    public Map<String, Class<?>> getEventClassMap() {
+        return eventClassMap;
+    }
+
     public RestClient createRestClientFor(final SalesforceEndpoint endpoint) throws SalesforceException {
         final SalesforceEndpointConfig endpointConfig = endpoint.getConfiguration();
 
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConstants.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConstants.java
index 0d8872361c2..61b6d78316a 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConstants.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConstants.java
@@ -20,8 +20,10 @@ import org.apache.camel.spi.Metadata;
 
 public final class SalesforceConstants {
 
-    @Metadata(label = "consumer", description = "The replay id.", javaType = "Object")
+    @Metadata(label = "consumer", description = "The Streaming API replayId.", javaType = "Object")
     public static final String HEADER_SALESFORCE_REPLAY_ID = "CamelSalesforceReplayId";
+    @Metadata(label = "consumer", description = "The Pub/Sub API replayId.", javaType = "Object")
+    public static final String HEADER_SALESFORCE_PUBSUB_REPLAY_ID = "CamelSalesforcePubSubReplayId";
     @Metadata(label = "consumer", description = "The change event schema.", javaType = "Object")
     public static final String HEADER_SALESFORCE_CHANGE_EVENT_SCHEMA = "CamelSalesforceChangeEventSchema";
     @Metadata(label = "consumer", description = "The event type.", javaType = "String")
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
index da9702ed0dc..866f9af4dec 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
@@ -57,18 +57,23 @@ public class SalesforceEndpoint extends DefaultEndpoint {
             + "bulk2AbortJob,bulk2DeleteJob,bulk2GetSuccessfulResults,bulk2GetFailedResults,"
             + "bulk2GetUnprocessedRecords,bulk2CreateQueryJob,bulk2GetQueryJob,"
             + "bulk2GetAllQueryJobs,bulk2GetQueryJobResults,bulk2AbortQueryJob,bulk2DeleteQueryJob,"
-            + "raw,subscribe")
+            + "raw,subscribe,pubSubSubscribe,pubSubPublish")
     @Metadata(required = true)
     private final OperationName operationName;
     //CHECKSTYLE:ON
-    @UriPath(label = "consumer", description = "The name of the topic/channel to use")
+
+    @UriPath(label = "consumer,producer", description = "The name of the topic/channel to use")
     private final String topicName;
+
     @UriParam
     private final SalesforceEndpointConfig configuration;
 
-    @UriParam(label = "consumer", description = "The replayId value to use when subscribing")
+    @UriParam(label = "consumer", description = "The replayId value to use when subscribing to the Streaming API.")
     private Long replayId;
 
+    @UriParam(label = "consumer", description = "The replayId value to use when subscribing to the Pub/Sub API.")
+    private String pubSubReplayId;
+
     public SalesforceEndpoint(String uri, SalesforceComponent salesforceComponent, SalesforceEndpointConfig configuration,
                               OperationName operationName, String topicName) {
         super(uri, salesforceComponent);
@@ -91,8 +96,16 @@ public class SalesforceEndpoint extends DefaultEndpoint {
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        final SubscriptionHelper subscriptionHelper = getComponent().getSubscriptionHelper();
-        final SalesforceConsumer consumer = new SalesforceConsumer(this, processor, subscriptionHelper);
+        Consumer consumer = null;
+        switch (operationName) {
+            case SUBSCRIBE -> {
+                final SubscriptionHelper subscriptionHelper = getComponent().getSubscriptionHelper();
+                consumer = new StreamingApiConsumer(this, processor, subscriptionHelper);
+            }
+            case PUBSUB_SUBSCRIBE -> {
+                consumer = new PubSubApiConsumer(this, processor);
+            }
+        }
         configureConsumer(consumer);
         return consumer;
     }
@@ -122,6 +135,14 @@ public class SalesforceEndpoint extends DefaultEndpoint {
         return replayId;
     }
 
+    public String getPubSubReplayId() {
+        return pubSubReplayId;
+    }
+
+    public void setPubSubReplayId(String pubSubReplayId) {
+        this.pubSubReplayId = pubSubReplayId;
+    }
+
     @Override
     protected void doStart() throws Exception {
         try {
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
index 54008624b76..d2262f44544 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.salesforce.eventbus.protobuf.ReplayPreset;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.salesforce.api.dto.analytics.reports.ReportMetadata;
 import org.apache.camel.component.salesforce.api.dto.approval.ApprovalRequest;
@@ -95,6 +96,11 @@ public class SalesforceEndpointConfig implements Cloneable {
     public static final String INITIAL_REPLAY_ID_MAP = "initialReplayIdMap";
     public static final long REPLAY_FROM_TIP = -1L;
 
+    // parameters for Pub/Sub API
+    public static final String REPLAY_PRESET = "replayPreset";
+    public static final String PUB_SUB_DESERIALIZE_TYPE = "pubSubDeserializeType";
+    public static final String PUB_SUB_POJO_CLASS = "pubSubPojoClass";
+
     // parameters for Approval API
     public static final String APPROVAL = "approval";
 
@@ -195,6 +201,24 @@ public class SalesforceEndpointConfig implements Cloneable {
     @UriParam
     private Boolean notifyForOperationUndelete;
 
+    // Pub/Sub API properties
+    @UriParam(label = "consumer", defaultValue = "100",
+              description = "Max number of events to receive in a batch from the Pub/Sub API.")
+    private int pubSubBatchSize = 100;
+
+    @UriParam(label = "consumer", defaultValue = "AVRO",
+              description = "How to deserialize events consume from the Pub/Sub API. AVRO will try a " +
+                            "SpecificRecord subclass if found, otherwise GenericRecord.",
+              enums = "AVRO,SPECIFIC_RECORD,GENERIC_RECORD,POJO,JSON")
+    private PubSubDeserializeType pubSubDeserializeType = PubSubDeserializeType.AVRO;
+
+    @UriParam(label = "consumer", description = "Replay preset for Pub/Sub API.", defaultValue = "LATEST",
+              enums = "LATEST,EARLIEST,CUSTOM")
+    private ReplayPreset replayPreset = ReplayPreset.LATEST;
+
+    @UriParam(label = "consumer", description = "Fully qualified class name to deserialize Pub/Sub API event to.")
+    private String pubSubPojoClass;
+
     // Analytics API properties
     @UriParam
     private String reportId;
@@ -814,6 +838,11 @@ public class SalesforceEndpointConfig implements Cloneable {
         valueMap.put(FALL_BACK_REPLAY_ID, fallBackReplayId);
         valueMap.put(INITIAL_REPLAY_ID_MAP, initialReplayIdMap);
 
+        // add Pub/Sub API properties
+        valueMap.put(REPLAY_PRESET, initialReplayIdMap);
+        valueMap.put(PUB_SUB_DESERIALIZE_TYPE, pubSubDeserializeType);
+        valueMap.put(PUB_SUB_POJO_CLASS, pubSubPojoClass);
+
         valueMap.put(NOT_FOUND_BEHAVIOUR, notFoundBehaviour);
 
         valueMap.put(RAW_PATH, rawPath);
@@ -859,6 +888,43 @@ public class SalesforceEndpointConfig implements Cloneable {
         this.fallBackReplayId = fallBackReplayId;
     }
 
+    /**
+     * ReplayPreset for Pub/Sub API
+     */
+    public ReplayPreset getReplayPreset() {
+        return replayPreset;
+    }
+
+    public void setReplayPreset(ReplayPreset replayPreset) {
+        this.replayPreset = replayPreset;
+    }
+
+    /**
+     * Type of deserialization for Pub/Sub API events
+     *
+     * @return
+     */
+    public PubSubDeserializeType getPubSubDeserializeType() {
+        return pubSubDeserializeType;
+    }
+
+    public void setPubSubDeserializeType(PubSubDeserializeType pubSubDeserializeType) {
+        this.pubSubDeserializeType = pubSubDeserializeType;
+    }
+
+    /**
+     * Class to deserialize Pub/Sub API events to
+     *
+     * @return
+     */
+    public String getPubSubPojoClass() {
+        return pubSubPojoClass;
+    }
+
+    public void setPubSubPojoClass(String pubSubPojoClass) {
+        this.pubSubPojoClass = pubSubPojoClass;
+    }
+
     public Integer getLimit() {
         return limit;
     }
@@ -1100,4 +1166,13 @@ public class SalesforceEndpointConfig implements Cloneable {
     public void setRawHttpHeaders(String rawHttpHeaders) {
         this.rawHttpHeaders = rawHttpHeaders;
     }
+
+    public int getPubSubBatchSize() {
+        return pubSubBatchSize;
+    }
+
+    public void setPubSubBatchSize(int pubSubBatchSize) {
+        this.pubSubBatchSize = pubSubBatchSize;
+    }
+
 }
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceProducer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceProducer.java
index 2e9ffece02c..c29040d8a94 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceProducer.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceProducer.java
@@ -28,6 +28,7 @@ import org.apache.camel.component.salesforce.internal.processor.BulkApiV2Process
 import org.apache.camel.component.salesforce.internal.processor.CompositeApiProcessor;
 import org.apache.camel.component.salesforce.internal.processor.CompositeSObjectCollectionsProcessor;
 import org.apache.camel.component.salesforce.internal.processor.JsonRestProcessor;
+import org.apache.camel.component.salesforce.internal.processor.PubSubApiProcessor;
 import org.apache.camel.component.salesforce.internal.processor.RawProcessor;
 import org.apache.camel.component.salesforce.internal.processor.SalesforceProcessor;
 import org.apache.camel.support.DefaultAsyncProducer;
@@ -63,6 +64,8 @@ public class SalesforceProducer extends DefaultAsyncProducer {
             processor = new CompositeSObjectCollectionsProcessor(endpoint);
         } else if (isRawOperation(operationName)) {
             processor = new RawProcessor(endpoint);
+        } else if (isPubSubOperation(operationName)) {
+            processor = new PubSubApiProcessor(endpoint);
         } else {
             processor = new JsonRestProcessor(endpoint);
         }
@@ -154,6 +157,10 @@ public class SalesforceProducer extends DefaultAsyncProducer {
         return operationName == OperationName.RAW;
     }
 
+    private static boolean isPubSubOperation(OperationName operationName) {
+        return operationName == OperationName.PUBSUB_PUBLISH;
+    }
+
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         SalesforceEndpoint endpoint = (SalesforceEndpoint) getEndpoint();
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java
similarity index 97%
rename from components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
rename to components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java
index 8f361a66a8c..04ba2979848 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConsumer.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java
@@ -39,9 +39,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The Salesforce consumer.
+ * The Salesforce Streaming API consumer.
  */
-public class SalesforceConsumer extends DefaultConsumer {
+public class StreamingApiConsumer extends DefaultConsumer {
 
     private enum MessageKind {
         CHANGE_EVENT,
@@ -59,7 +59,7 @@ public class SalesforceConsumer extends DefaultConsumer {
         }
     }
 
-    private static final Logger LOG = LoggerFactory.getLogger(SalesforceConsumer.class);
+    private static final Logger LOG = LoggerFactory.getLogger(StreamingApiConsumer.class);
 
     private static final String CREATED_DATE_PROPERTY = "createdDate";
     private static final String EVENT_PROPERTY = "event";
@@ -81,7 +81,7 @@ public class SalesforceConsumer extends DefaultConsumer {
     private final SubscriptionHelper subscriptionHelper;
     private final String topicName;
 
-    public SalesforceConsumer(final SalesforceEndpoint endpoint, final Processor processor, final SubscriptionHelper helper) {
+    public StreamingApiConsumer(final SalesforceEndpoint endpoint, final Processor processor, final SubscriptionHelper helper) {
         super(endpoint, processor);
         this.endpoint = endpoint;
         final ObjectMapper configuredObjectMapper = endpoint.getConfiguration().getObjectMapper();
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/pubsub/PublishResult.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/pubsub/PublishResult.java
new file mode 100644
index 00000000000..38c58cf4042
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/api/dto/pubsub/PublishResult.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.api.dto.pubsub;
+
+import com.salesforce.eventbus.protobuf.Error;
+import org.apache.camel.component.salesforce.internal.client.PubSubApiClient;
+
+public class PublishResult {
+
+    private final com.salesforce.eventbus.protobuf.PublishResult source;
+
+    public PublishResult(com.salesforce.eventbus.protobuf.PublishResult source) {
+        this.source = source;
+    }
+
+    // Replay ID is opaque.
+    public String getReplayId() {
+        return PubSubApiClient.base64EncodeByteString(source.getReplayId());
+    }
+
+    public boolean hasError() {
+        return source.hasError();
+    }
+
+    public Error getError() {
+        return source.getError();
+    }
+
+    public String getCorrelationKey() {
+        return source.getCorrelationKey();
+    }
+
+    public com.salesforce.eventbus.protobuf.PublishResult getSource() {
+        return source;
+    }
+
+    @Override
+    public String toString() {
+        return "PublishResult{" +
+               "hasError=" + hasError() +
+               ",error=" + getError() +
+               ",correlationKey=" + getCorrelationKey() +
+               "}";
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/OperationName.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/OperationName.java
index c05403850b9..ea87d77b600 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/OperationName.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/OperationName.java
@@ -107,7 +107,12 @@ public enum OperationName {
     // Raw operation
     RAW("raw"),
 
-    SUBSCRIBE("subscribe");
+    // Streaming API
+    SUBSCRIBE("subscribe"),
+
+    // Pub/Sub API
+    PUBSUB_PUBLISH("pubSubPublish"),
+    PUBSUB_SUBSCRIBE("pubSubSubscribe");
 
     private final String value;
 
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
index 03e2b6e1dff..0cf0a6b814a 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/SalesforceSession.java
@@ -31,9 +31,11 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.camel.CamelContext;
@@ -82,8 +84,12 @@ public class SalesforceSession extends ServiceSupport {
 
     private volatile String accessToken;
     private volatile String instanceUrl;
+    private volatile String id;
+    private volatile String orgId;
 
-    private CamelContext camelContext;
+    private final CamelContext camelContext;
+    private final AtomicBoolean loggingIn = new AtomicBoolean();
+    private CountDownLatch latch = new CountDownLatch(1);
 
     public SalesforceSession(CamelContext camelContext, SalesforceHttpClient httpClient, long timeout,
                              SalesforceLoginConfig config) {
@@ -101,6 +107,54 @@ public class SalesforceSession extends ServiceSupport {
         this.listeners = new CopyOnWriteArraySet<>();
     }
 
+    public void attemptLoginUntilSuccessful(long backoffIncrement, long maxBackoff) {
+        // if another thread is logging in, we will just wait until it's successful
+        if (!loggingIn.compareAndSet(false, true)) {
+            LOG.debug("waiting on login from another thread");
+            // TODO: This is janky
+            try {
+                while (latch == null) {
+                    Thread.sleep(100);
+                }
+                latch.await();
+            } catch (InterruptedException ex) {
+                throw new RuntimeException("Failed to login.", ex);
+            }
+            LOG.debug("done waiting");
+            return;
+        }
+        LOG.debug("Attempting to login, no other threads logging in");
+        latch = new CountDownLatch(1);
+
+        long backoff = 0;
+
+        try {
+            for (;;) {
+                try {
+                    if (isStoppingOrStopped()) {
+                        return;
+                    }
+                    login(getAccessToken());
+                    break;
+                } catch (SalesforceException e) {
+                    backoff = backoff + backoffIncrement;
+                    if (backoff > maxBackoff) {
+                        backoff = maxBackoff;
+                    }
+                    LOG.warn(String.format("Salesforce login failed. Pausing for %d milliseconds", backoff), e);
+                    try {
+                        Thread.sleep(backoff);
+                    } catch (InterruptedException ex) {
+                        throw new RuntimeException("Failed to login.", ex);
+                    }
+                }
+            }
+        } finally {
+            loggingIn.set(false);
+            latch.countDown();
+        }
+    }
+
     public synchronized String login(String oldToken) throws SalesforceException {
 
         // check if we need a new session
@@ -133,7 +187,6 @@ public class SalesforceSession extends ServiceSupport {
                 throw new SalesforceException("Unexpected login error: " + e.getCause().getMessage(), e.getCause());
             }
         }
-
         return accessToken;
     }
 
@@ -259,6 +312,8 @@ public class SalesforceSession extends ServiceSupport {
                     LOG.info("Login successful");
                     accessToken = token.getAccessToken();
                     instanceUrl = Optional.ofNullable(config.getInstanceUrl()).orElse(token.getInstanceUrl());
+                    id = token.getId();
+                    orgId = id.substring(id.indexOf("id/") + 3, id.indexOf("id/") + 21);
                     // strip trailing '/'
                     int lastChar = instanceUrl.length() - 1;
                     if (instanceUrl.charAt(lastChar) == '/') {
@@ -347,6 +402,14 @@ public class SalesforceSession extends ServiceSupport {
         return instanceUrl;
     }
 
+    public String getId() {
+        return id;
+    }
+
+    public String getOrgId() {
+        return orgId;
+    }
+
     public boolean addListener(SalesforceSessionListener listener) {
         return listeners.add(listener);
     }
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
new file mode 100644
index 00000000000..85e4f300b13
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import com.google.protobuf.ByteString;
+import com.salesforce.eventbus.protobuf.ConsumerEvent;
+import com.salesforce.eventbus.protobuf.FetchRequest;
+import com.salesforce.eventbus.protobuf.FetchResponse;
+import com.salesforce.eventbus.protobuf.ProducerEvent;
+import com.salesforce.eventbus.protobuf.PubSubGrpc;
+import com.salesforce.eventbus.protobuf.PublishRequest;
+import com.salesforce.eventbus.protobuf.PublishResponse;
+import com.salesforce.eventbus.protobuf.PublishResult;
+import com.salesforce.eventbus.protobuf.ReplayPreset;
+import com.salesforce.eventbus.protobuf.SchemaRequest;
+import com.salesforce.eventbus.protobuf.TopicInfo;
+import com.salesforce.eventbus.protobuf.TopicRequest;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Metadata;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.camel.component.salesforce.PubSubApiConsumer;
+import org.apache.camel.component.salesforce.SalesforceLoginConfig;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.apache.camel.support.service.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+public class PubSubApiClient extends ServiceSupport {
+
+    public static final String PUBSUB_ERROR_AUTH_ERROR = "sfdc.platform.eventbus.grpc.service.auth.error";
+    private static final String PUBSUB_ERROR_AUTH_REFRESH_INVALID = "sfdc.platform.eventbus.grpc.service.auth.refresh.invalid";
+
+    protected PubSubGrpc.PubSubStub asyncStub;
+    protected PubSubGrpc.PubSubBlockingStub blockingStub;
+    protected String accessToken;
+
+    private final long backoffIncrement;
+    private final long maxBackoff;
+    private final String pubSubHost;
+    private final int pubSubPort;
+
+    private final Logger LOG = LoggerFactory.getLogger(getClass());
+    private final SalesforceLoginConfig loginConfig;
+    private final SalesforceSession session;
+
+    private final Map<String, Schema> schemaCache = new ConcurrentHashMap<>();
+    private final Map<String, String> schemaJsonCache = new ConcurrentHashMap<>();
+    private final Map<String, TopicInfo> topicInfoCache = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<PubSubApiConsumer, StreamObserver<FetchRequest>> observerMap = new ConcurrentHashMap<>();
+
+    private ManagedChannel channel;
+    private boolean usePlainTextConnection = false;
+
+    public PubSubApiClient(SalesforceSession session, SalesforceLoginConfig loginConfig, String pubSubHost,
+                           int pubSubPort, long backoffIncrement, long maxBackoff) {
+        this.session = session;
+        this.loginConfig = loginConfig;
+        this.pubSubHost = pubSubHost;
+        this.pubSubPort = pubSubPort;
+        this.maxBackoff = maxBackoff;
+        this.backoffIncrement = backoffIncrement;
+    }
+
+    public List<org.apache.camel.component.salesforce.api.dto.pubsub.PublishResult> publishMessage(
+            String topic, List<?> bodies)
+            throws IOException {
+        LOG.debug("Preparing to publish on topic {}", topic);
+        TopicInfo topicInfo = getTopicInfo(topic);
+        String busTopicName = topicInfo.getTopicName();
+        Schema schema = getSchema(topicInfo.getSchemaId());
+        List<ProducerEvent> events = new ArrayList<>(bodies.size());
+        for (Object body : bodies) {
+            final ProducerEvent event = createProducerEvent(topicInfo.getSchemaId(), schema, body);
+            events.add(event);
+        }
+        PublishRequest publishRequest = PublishRequest.newBuilder()
+                .setTopicName(busTopicName)
+                .addAllEvents(events)
+                .build();
+        PublishResponse response = blockingStub.publish(publishRequest);
+        LOG.debug("Published on topic {}", topic);
+        final List<PublishResult> results = response.getResultsList();
+        List<org.apache.camel.component.salesforce.api.dto.pubsub.PublishResult> publishResults
+                = new ArrayList<>(results.size());
+        for (PublishResult rawResult : results) {
+            if (rawResult.hasError()) {
+                LOG.error("{} {} ", rawResult.getError().getCode(), rawResult.getError().getMsg());
+            }
+            publishResults.add(
+                    new org.apache.camel.component.salesforce.api.dto.pubsub.PublishResult(rawResult));
+        }
+        return publishResults;
+    }
+
+    public void subscribe(PubSubApiConsumer consumer, ReplayPreset replayPreset, String initialReplayId) {
+        LOG.error("Starting subscribe {}", consumer.getTopic());
+        if (replayPreset == ReplayPreset.CUSTOM && initialReplayId == null) {
+            throw new RuntimeException("initialReplayId is required for ReplayPreset.CUSTOM");
+        }
+
+        ByteString replayId = null;
+        if (initialReplayId != null) {
+            replayId = base64DecodeToByteString(initialReplayId);
+        }
+        String topic = consumer.getTopic();
+        LOG.info("Subscribing to topic: {}.", topic);
+        final FetchResponseObserver responseObserver = new FetchResponseObserver(consumer);
+        StreamObserver<FetchRequest> serverStream = asyncStub.subscribe(responseObserver);
+        LOG.info("Subscribe successful.");
+        responseObserver.setServerStream(serverStream);
+        observerMap.put(consumer, serverStream);
+        FetchRequest.Builder fetchRequestBuilder = FetchRequest.newBuilder()
+                .setReplayPreset(replayPreset)
+                .setTopicName(topic)
+                .setNumRequested(consumer.getBatchSize());
+        if (replayPreset == ReplayPreset.CUSTOM) {
+            fetchRequestBuilder.setReplayId(replayId);
+        }
+        serverStream.onNext(fetchRequestBuilder.build());
+    }
+
+    public TopicInfo getTopicInfo(String name) {
+        return topicInfoCache.computeIfAbsent(name,
+                topic -> blockingStub.getTopic(TopicRequest.newBuilder().setTopicName(topic).build()));
+    }
+
+    public String getSchemaJson(String schemaId) {
+        return schemaJsonCache.computeIfAbsent(schemaId,
+                s -> blockingStub.getSchema(SchemaRequest.newBuilder().setSchemaId(s).build()).getSchemaJson());
+    }
+
+    public Schema getSchema(String schemaId) {
+        return schemaCache.computeIfAbsent(schemaId, id -> (new Schema.Parser()).parse(getSchemaJson(id)));
+    }
+
+    public static String base64EncodeByteString(ByteString bs) {
+        var bb = bs.asReadOnlyByteBuffer();
+        bb.position(0);
+        byte[] bytes = new byte[bb.limit()];
+        bb.get(bytes, 0, bytes.length);
+        return Base64.getEncoder().encodeToString(bytes);
+    }
+
+    public static ByteString base64DecodeToByteString(String b64) {
+        final byte[] decode = Base64.getDecoder().decode(b64);
+        return ByteString.copyFrom(decode);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        final ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder
+                .forAddress(pubSubHost, pubSubPort);
+        if (usePlainTextConnection) {
+            channelBuilder.usePlaintext();
+        }
+        channel = channelBuilder.build();
+        TokenCredentials callCredentials = new TokenCredentials(session);
+        this.asyncStub = PubSubGrpc.newStub(channel).withCallCredentials(callCredentials);
+        this.blockingStub = PubSubGrpc.newBlockingStub(channel).withCallCredentials(callCredentials);
+
+        // accessToken could be null
+        accessToken = session.getAccessToken();
+        if (accessToken == null && !loginConfig.isLazyLogin()) {
+            try {
+                accessToken = session.login(null);
+            } catch (SalesforceException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        LOG.warn("Stopping PubSubApiClient");
+        // stop each open stream
+        observerMap.values().forEach(observer -> {
+            LOG.debug("Stopping subscription");
+            observer.onCompleted();
+        });
+
+        channel.shutdown();
+        channel.awaitTermination(10, TimeUnit.SECONDS);
+        super.doStop();
+    }
+
+    private ProducerEvent createProducerEvent(String schemaId, Schema schema, Object body) throws IOException {
+        if (body instanceof ProducerEvent e) {
+            return e;
+        }
+        byte[] bytes;
+        if (body instanceof IndexedRecord indexedRecord) {
+            if (body instanceof GenericRecord record) {
+                bytes = getBytes(body, new GenericDatumWriter<>(record.getSchema()));
+            } else if (body instanceof SpecificRecord) {
+                bytes = getBytes(body, new SpecificDatumWriter<>());
+            } else {
+                throw new IllegalArgumentException(
+                        "Body is of unexpected type: " + indexedRecord.getClass().getName());
+            }
+        } else if (body instanceof byte[] bodyBytes) {
+            bytes = bodyBytes;
+        } else if (body instanceof String json) {
+            JsonAvroConverter converter = new JsonAvroConverter();
+            bytes = converter.convertToAvro(json.getBytes(), schema);
+        } else {
+            // try serializing as POJO
+            bytes = getBytes(body, new ReflectDatumWriter<>(schema));
+        }
+        return ProducerEvent.newBuilder()
+                .setSchemaId(schemaId)
+                .setPayload(ByteString.copyFrom(bytes))
+                .build();
+    }
+
+    private byte[] getBytes(Object body, DatumWriter<Object> writer) throws IOException {
+        byte[] bytes;
+        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(buffer, null);
+        writer.write(body, encoder);
+        bytes = buffer.toByteArray();
+        return bytes;
+    }
+
+    private class FetchResponseObserver implements StreamObserver<FetchResponse> {
+
+        private final Logger LOG = LoggerFactory.getLogger(getClass());
+        private final PubSubApiConsumer consumer;
+        private final Map<String, Class<?>> eventClassMap;
+        private final Class<?> pojoClass;
+        private String replayId;
+        private StreamObserver<FetchRequest> serverStream;
+
+        public FetchResponseObserver(PubSubApiConsumer consumer) {
+            this.consumer = consumer;
+            this.eventClassMap = consumer.getEventClassMap();
+            this.pojoClass = consumer.getPojoClass();
+        }
+
+        @Override
+        public void onNext(FetchResponse fetchResponse) {
+            String topic = consumer.getTopic();
+
+            LOG.debug("Received {} events on topic: {}", fetchResponse.getEventsList().size(), topic);
+            LOG.debug("rpcId: " + fetchResponse.getRpcId());
+            LOG.debug("pending_num_requested: " + fetchResponse.getPendingNumRequested());
+            for (ConsumerEvent ce : fetchResponse.getEventsList()) {
+                try {
+                    processEvent(ce);
+                } catch (Exception e) {
+                    LOG.error(e.toString(), e);
+                }
+            }
+            replayId = base64EncodeByteString(fetchResponse.getLatestReplayId());
+            int nextRequestSize = consumer.getBatchSize() - fetchResponse.getPendingNumRequested();
+            // batchSize could be zero if this FetchResponse contained an empty batch, which is to be expected
+            // for keep-alive reasons. In this case there is no need to send a FetchRequest
+            if (nextRequestSize > 0) {
+                FetchRequest fetchRequest = FetchRequest.newBuilder().setTopicName(topic)
+                        .setNumRequested(nextRequestSize).build();
+                LOG.debug("Sending FetchRequest, num_requested: {}", nextRequestSize);
+                serverStream.onNext(fetchRequest);
+            }
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            observerMap.remove(consumer);
+            if (throwable instanceof StatusRuntimeException e) {
+                LOG.error("GRPC Exception", e);
+                Metadata trailers = e.getTrailers();
+                String errorCode = "";
+                LOG.error("Trailers:");
+                if (trailers != null) {
+                    trailers.keys().forEach(trailer -> LOG.error("Trailer: {}, Value: {}", trailer,
+                            trailers.get(Metadata.Key.of(trailer, Metadata.ASCII_STRING_MARSHALLER))));
+                    errorCode = trailers.get(Metadata.Key.of("error-code", Metadata.ASCII_STRING_MARSHALLER));
+                }
+                if (errorCode != null) {
+                    switch (errorCode) {
+                        case PUBSUB_ERROR_AUTH_ERROR, PUBSUB_ERROR_AUTH_REFRESH_INVALID -> {
+                            LOG.error("attempting login");
+                            session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff);
+                            LOG.debug("logged in {}", consumer.getTopic());
+                        }
+                        default -> LOG.error("unexpected errorCode: {}", errorCode);
+                    }
+                }
+            } else {
+                LOG.error("An unexpected error occurred.", throwable);
+            }
+            LOG.debug("Attempting subscribe after error");
+            if (replayId == null) {
+                LOG.warn("Not re-subscribing after error because replayId is null. Topic: {}",
+                        consumer.getTopic());
+                return;
+            }
+            subscribe(consumer, ReplayPreset.CUSTOM, replayId);
+        }
+
+        @Override
+        public void onCompleted() {
+            LOG.debug("onCompleted() called by server");
+            observerMap.remove(consumer);
+        }
+
+        public void setServerStream(StreamObserver<FetchRequest> serverStream) {
+            this.serverStream = serverStream;
+        }
+
+        private void processEvent(ConsumerEvent ce) throws IOException {
+            final Schema schema = getSchema(ce.getEvent().getSchemaId());
+            Object record = switch (consumer.getDeserializeType()) {
+                case AVRO -> deserializeAvro(ce, schema);
+                case GENERIC_RECORD -> deserializeGenericRecord(ce, schema);
+                case SPECIFIC_RECORD -> deserializeSpecificRecord(ce, schema);
+                case POJO -> deserializePojo(ce, schema);
+                case JSON -> deserializeJson(ce, schema);
+            };
+            String replayId = PubSubApiClient.base64EncodeByteString(ce.getReplayId());
+            consumer.processEvent(record, replayId);
+        }
+
+        private Object deserializeAvro(ConsumerEvent ce, Schema schema) throws IOException {
+            if (eventClassMap.containsKey(schema.getFullName())) {
+                return deserializeSpecificRecord(ce, schema);
+            } else {
+                LOG.debug("No DTO found for schema: {}. Using GenericRecord.", schema.getFullName());
+                return deserializeGenericRecord(ce, schema);
+            }
+        }
+
+        private Object deserializeJson(ConsumerEvent ce, Schema schema) throws IOException {
+            final GenericRecord record = deserializeGenericRecord(ce, schema);
+            JsonAvroConverter converter = new JsonAvroConverter();
+            final byte[] bytes = converter.convertToJson(record);
+            return new String(bytes);
+        }
+
+        private Object deserializePojo(ConsumerEvent ce, Schema schema) throws IOException {
+            ReflectDatumReader<?> reader = new ReflectDatumReader(pojoClass);
+            reader.setSchema(schema);
+            ByteArrayInputStream in = new ByteArrayInputStream(ce.getEvent().getPayload().toByteArray());
+            BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+            return reader.read(null, decoder);
+        }
+
+        private GenericRecord deserializeGenericRecord(ConsumerEvent ce, Schema schema) throws IOException {
+            DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+            ByteArrayInputStream in = new ByteArrayInputStream(ce.getEvent().getPayload().toByteArray());
+            BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+            return reader.read(null, decoder);
+        }
+
+        private Object deserializeSpecificRecord(ConsumerEvent ce, Schema schema) throws IOException {
+            final Class<?> clas = eventClassMap.get(schema.getFullName());
+            DatumReader<?> reader = new SpecificDatumReader<>(clas);
+            ByteArrayInputStream in = new ByteArrayInputStream(ce.getEvent().getPayload().toByteArray());
+            BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
+            return reader.read(null, decoder);
+        }
+    }
+
+    // ability to use Plain Text (http) for test contexts
+    public void setUsePlainTextConnection(boolean usePlainTextConnection) {
+        this.usePlainTextConnection = usePlainTextConnection;
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/TokenCredentials.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/TokenCredentials.java
new file mode 100644
index 00000000000..5636a23129e
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/TokenCredentials.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import java.util.concurrent.Executor;
+
+import io.grpc.CallCredentials;
+import io.grpc.Metadata;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+
+public class TokenCredentials extends CallCredentials {
+
+    private final SalesforceSession session;
+    public static final Metadata.Key<String> INSTANCE_URL_KEY = keyOf("instanceUrl");
+    public static final Metadata.Key<String> SESSION_TOKEN_KEY = keyOf("accessToken");
+    public static final Metadata.Key<String> TENANT_ID_KEY = keyOf("tenantId");
+
+    public TokenCredentials(SalesforceSession session) {
+        this.session = session;
+    }
+
+    @Override
+    public void applyRequestMetadata(RequestInfo requestInfo, Executor executor, MetadataApplier metadataApplier) {
+        Metadata headers = new Metadata();
+        headers.put(INSTANCE_URL_KEY, session.getInstanceUrl());
+        headers.put(TENANT_ID_KEY, session.getOrgId());
+        headers.put(SESSION_TOKEN_KEY, session.getAccessToken());
+        metadataApplier.apply(headers);
+    }
+
+    @Override
+    public void thisUsesUnstableApi() {
+
+    }
+
+    private static Metadata.Key<String> keyOf(String name) {
+        return Metadata.Key.of(name, Metadata.ASCII_STRING_MARSHALLER);
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java
index 773733222ec..238eaaa0684 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/AbstractSalesforceProcessor.java
@@ -53,6 +53,7 @@ public abstract class AbstractSalesforceProcessor extends ServiceSupport impleme
     protected SalesforceHttpClient httpClient;
     protected SalesforceLoginConfig loginConfig;
     protected Map<String, Class<?>> classMap;
+    protected Map<String, Class<?>> eventClassMap;
 
     protected boolean rawPayload;
 
@@ -78,6 +79,9 @@ public abstract class AbstractSalesforceProcessor extends ServiceSupport impleme
         if (classMap == null) {
             this.classMap = endpoint.getComponent().getClassMap();
         }
+        if (eventClassMap == null) {
+            this.eventClassMap = endpoint.getComponent().getEventClassMap();
+        }
     }
 
     @Override
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/PubSubApiProcessor.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/PubSubApiProcessor.java
new file mode 100644
index 00000000000..c648309c4c9
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/processor/PubSubApiProcessor.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.processor;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.api.dto.pubsub.PublishResult;
+import org.apache.camel.component.salesforce.internal.OperationName;
+import org.apache.camel.component.salesforce.internal.client.PubSubApiClient;
+import org.apache.camel.support.service.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PubSubApiProcessor extends AbstractSalesforceProcessor {
+
+    private final Logger LOG = LoggerFactory.getLogger(getClass());
+    private final String topic;
+    private PubSubApiClient pubSubClient;
+
+    public PubSubApiProcessor(final SalesforceEndpoint endpoint) {
+        super(endpoint);
+        this.topic = endpoint.getTopicName();
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            if (operationName == OperationName.PUBSUB_PUBLISH) {
+                processPublish(exchange, callback);
+                return false;
+            } else {
+                throw new SalesforceException("Unknown operation: " + operationName.value(), null);
+            }
+        } catch (SalesforceException e) {
+            exchange.setException(new SalesforceException(
+                    String.format("Error processing %s: [%s] \"%s\"", operationName.value(), e.getStatusCode(), e.getMessage()),
+                    e));
+            callback.done(true);
+            return true;
+        } catch (Exception e) {
+            exchange.setException(new SalesforceException(
+                    String.format("Unexpected Error processing %s: \"%s\"", operationName.value(), e.getMessage()), e));
+            callback.done(true);
+            return true;
+        }
+    }
+
+    private void processPublish(Exchange exchange, AsyncCallback callback) throws SalesforceException {
+        try {
+            LOG.debug("Publishing on topic: {}", topic);
+            final List<?> body = exchange.getIn().getMandatoryBody(List.class);
+            final List<PublishResult> results = pubSubClient.publishMessage(topic, body);
+            exchange.getIn().setBody(results);
+            callback.done(false);
+        } catch (InvalidPayloadException | IOException e) {
+            exchange.setException(new SalesforceException(
+                    String.format("Unexpected Error processing %s: \"%s\"", operationName.value(), e.getMessage()), e));
+            callback.done(true);
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        this.pubSubClient = new PubSubApiClient(
+                endpoint.getComponent().getSession(),
+                endpoint.getComponent().getLoginConfig(), endpoint.getComponent().getPubSubHost(),
+                endpoint.getComponent().getPubSubPort(), 0, 0);
+        ServiceHelper.startService(pubSubClient);
+    }
+
+    @Override
+    public void doStop() throws Exception {
+        ServiceHelper.stopService(pubSubClient);
+        super.doStop();
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index e0cb14a385f..0addba56da8 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -24,10 +24,10 @@ import java.util.stream.Stream;
 
 import org.apache.camel.CamelException;
 import org.apache.camel.component.salesforce.SalesforceComponent;
-import org.apache.camel.component.salesforce.SalesforceConsumer;
 import org.apache.camel.component.salesforce.SalesforceEndpoint;
 import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
 import org.apache.camel.component.salesforce.SalesforceHttpClient;
+import org.apache.camel.component.salesforce.StreamingApiConsumer;
 import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.support.service.ServiceSupport;
@@ -74,7 +74,7 @@ public class SubscriptionHelper extends ServiceSupport {
     private SalesforceSession session;
     private final long timeout = 60 * 1000L;
 
-    private final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> listenerMap;
+    private final Map<StreamingApiConsumer, ClientSessionChannel.MessageListener> listenerMap;
     private final long maxBackoff;
     private final long backoffIncrement;
 
@@ -129,7 +129,7 @@ public class SubscriptionHelper extends ServiceSupport {
                                     if (failureReason.equals(AUTHENTICATION_INVALID)) {
                                         LOG.debug(
                                                 "attempting login due to handshake error: 403 -> 401::Authentication invalid");
-                                        attemptLoginUntilSuccessful();
+                                        session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff);
                                     }
                                 }
                             }
@@ -163,7 +163,7 @@ public class SubscriptionHelper extends ServiceSupport {
                             if (connectError != null && connectError.equals(AUTHENTICATION_INVALID)) {
                                 LOG.debug("connectError: {}", connectError);
                                 LOG.debug("Attempting login...");
-                                attemptLoginUntilSuccessful();
+                                session.attemptLoginUntilSuccessful(backoffIncrement, maxBackoff);
                             }
                             // Server says don't retry to connect, so we'll handshake instead
                             // Otherwise, Bayeux client automatically re-attempts connection
@@ -176,10 +176,10 @@ public class SubscriptionHelper extends ServiceSupport {
                             LOG.debug("Refreshing subscriptions to {} channels on reconnect", listenerMap.size());
                             // reconnected to Salesforce, subscribe to existing
                             // channels
-                            final Map<SalesforceConsumer, MessageListener> map = new HashMap<>(listenerMap);
+                            final Map<StreamingApiConsumer, MessageListener> map = new HashMap<>(listenerMap);
                             listenerMap.clear();
-                            for (Map.Entry<SalesforceConsumer, ClientSessionChannel.MessageListener> entry : map.entrySet()) {
-                                final SalesforceConsumer consumer = entry.getKey();
+                            for (Map.Entry<StreamingApiConsumer, ClientSessionChannel.MessageListener> entry : map.entrySet()) {
+                                final StreamingApiConsumer consumer = entry.getKey();
                                 final String topicName = consumer.getTopicName();
                                 subscribe(topicName, consumer);
                             }
@@ -301,7 +301,7 @@ public class SubscriptionHelper extends ServiceSupport {
                         // notify all consumers
                         String abortMsg = "Aborting handshake attempt due to: " + lastError.getMessage();
                         SalesforceException ex = new SalesforceException(abortMsg, lastError);
-                        for (SalesforceConsumer consumer : listenerMap.keySet()) {
+                        for (StreamingApiConsumer consumer : listenerMap.keySet()) {
                             consumer.handleException(abortMsg, ex);
                         }
                     }
@@ -341,8 +341,8 @@ public class SubscriptionHelper extends ServiceSupport {
         closeChannel(META_CONNECT, connectListener);
         closeChannel(META_HANDSHAKE, handshakeListener);
 
-        for (Map.Entry<SalesforceConsumer, MessageListener> entry : listenerMap.entrySet()) {
-            final SalesforceConsumer consumer = entry.getKey();
+        for (Map.Entry<StreamingApiConsumer, MessageListener> entry : listenerMap.entrySet()) {
+            final StreamingApiConsumer consumer = entry.getKey();
             final String topic = consumer.getTopicName();
 
             final MessageListener listener = entry.getValue();
@@ -410,12 +410,12 @@ public class SubscriptionHelper extends ServiceSupport {
         return client;
     }
 
-    public void subscribe(final String topicName, final SalesforceConsumer consumer) {
+    public void subscribe(final String topicName, final StreamingApiConsumer consumer) {
         subscribe(topicName, consumer, false);
     }
 
     public void subscribe(
-            final String topicName, final SalesforceConsumer consumer,
+            final String topicName, final StreamingApiConsumer consumer,
             final boolean skipReplayId) {
         // create subscription for consumer
         final String channelName = getChannelName(topicName);
@@ -543,40 +543,6 @@ public class SubscriptionHelper extends ServiceSupport {
         }
     }
 
-    private void attemptLoginUntilSuccessful() {
-        if (!loggingIn.compareAndSet(false, true)) {
-            LOG.debug("already logging in");
-            return;
-        }
-
-        long backoff = 0;
-
-        try {
-            for (;;) {
-                try {
-                    if (isStoppingOrStopped()) {
-                        return;
-                    }
-                    session.login(session.getAccessToken());
-                    break;
-                } catch (SalesforceException e) {
-                    backoff = backoff + backoffIncrement;
-                    if (backoff > maxBackoff) {
-                        backoff = maxBackoff;
-                    }
-                    LOG.warn(String.format("Salesforce login failed. Pausing for %d seconds", backoff), e);
-                    try {
-                        Thread.sleep(backoff);
-                    } catch (InterruptedException ex) {
-                        throw new RuntimeException("Failed to login.", ex);
-                    }
-                }
-            }
-        } finally {
-            loggingIn.set(false);
-        }
-    }
-
     static Optional<Long> determineReplayIdFor(final SalesforceEndpoint endpoint, final String topicName) {
         final String channelName = getChannelName(topicName);
 
@@ -619,7 +585,7 @@ public class SubscriptionHelper extends ServiceSupport {
         return channelName.toString();
     }
 
-    public void unsubscribe(String topicName, SalesforceConsumer consumer) {
+    public void unsubscribe(String topicName, StreamingApiConsumer consumer) {
 
         // channel name
         final String channelName = getChannelName(topicName);
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/proto/pubsub_api.proto b/components/camel-salesforce/camel-salesforce-component/src/main/proto/pubsub_api.proto
new file mode 100644
index 00000000000..eac651dd474
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/proto/pubsub_api.proto
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Provided by salesforce under Creative Commons CC0 1.0 Universal
+ */
+
+/*
+ * Salesforce Pub/Sub API Version 1.
+ */
+syntax = "proto3";
+package eventbus.v1;
+
+option java_multiple_files = true;
+option java_package = "com.salesforce.eventbus.protobuf";
+option java_outer_classname = "PubSubProto";
+
+option go_package = "github.com/developerforce/pub-sub-api/go/proto";
+
+/*
+ * Contains information about a topic and uniquely identifies it. TopicInfo is returned by the GetTopic RPC method.
+ */
+message TopicInfo {
+  // Topic name
+  string topic_name = 1;
+  // Tenant/org GUID
+  string tenant_guid = 2;
+  // Is publishing allowed?
+  bool can_publish = 3;
+  // Is subscription allowed?
+  bool can_subscribe = 4;
+  /* ID of the current topic schema, which can be used for
+   * publishing of generically serialized events.
+   */
+  string schema_id = 5;
+  // RPC ID used to trace errors.
+  string rpc_id = 6;
+}
+
+/*
+ * A request message for GetTopic. Note that the tenant/org is not directly referenced
+ * in the request, but is implicitly identified by the authentication headers.
+ */
+message TopicRequest {
+  // The name of the topic to retrieve.
+  string topic_name = 1;
+}
+
+/*
+ * Reserved for future use.
+ * Header that contains information for distributed tracing, filtering, routing, etc.
+ * For example, X-B3-* headers assigned by a publisher are stored with the event and
+ * can provide a full distributed trace of the event across its entire lifecycle.
+ */
+message EventHeader {
+  string key = 1;
+  bytes value = 2;
+}
+
+/*
+ * Represents an event that an event publishing app creates.
+ */
+message ProducerEvent {
+  // Either a user-provided ID or a system generated guid
+  string id = 1;
+  // Schema fingerprint for this event which is hash of the schema
+  string schema_id = 2;
+  // The message data field
+  bytes payload = 3;
+  // Reserved for future use. Key-value pairs of headers.
+  repeated EventHeader headers = 4;
+}
+
+/*
+ * Represents an event that is consumed in a subscriber client.
+ * In addition to the fields in ProducerEvent, ConsumerEvent has the replay_id field.
+ */
+message ConsumerEvent {
+  // The event with fields identical to ProducerEvent
+  ProducerEvent event = 1;
+  /* The replay ID of the event.
+   * A subscriber app can store the replay ID. When the app restarts, it can resume subscription
+   * starting from events in the event bus after the event with that replay ID.
+   */
+  bytes replay_id = 2;
+}
+
+/*
+ * Event publish result that the Publish RPC method returns. The result contains replay_id or a publish error.
+ */
+message PublishResult {
+  // Replay ID of the event
+  bytes replay_id = 1;
+  // Publish error if any
+  Error error = 2;
+  // Correlation key of the ProducerEvent
+  string correlationKey = 3;
+}
+
+// Contains error information for an error that an RPC method returns.
+message Error {
+  // Error code
+  ErrorCode code = 1;
+  // Error message
+  string msg = 2;
+}
+
+// Supported error codes
+enum ErrorCode {
+  UNKNOWN = 0;
+  PUBLISH = 1;
+}
+
+/*
+ * Supported subscription replay start values.
+ * By default, the subscription will start at the tip of the stream if ReplayPreset is not specified.
+ */
+enum ReplayPreset {
+  // Start the subscription at the tip of the stream.
+  LATEST = 0;
+  // Start the subscription at the earliest point in the stream.
+  EARLIEST = 1;
+  // Start the subscription after a custom point in the stream. This must be set with a valid replay_id in the FetchRequest.
+  CUSTOM = 2;
+}
+
+/*
+ * Request for the Subscribe streaming RPC method. This request is used to:
+ * 1. Establish the initial subscribe stream.
+ * 2. Request more events from the subscription stream.
+ * Flow Control is handled by the subscriber via num_requested.
+ * A client can specify a starting point for the subscription with replay_preset and replay_id combinations.
+ * If no replay_preset is specified, the subscription starts at LATEST (tip of the stream).
+ * replay_preset and replay_id values are only consumed as part of the first FetchRequest. If
+ * a client needs to start at another point in the stream, it must start a new subscription.
+ */
+message FetchRequest {
+  /*
+   * Identifies a topic for subscription in the very first FetchRequest of the stream. The topic cannot change
+   * in subsequent FetchRequests within the same subscribe stream, but can be omitted for efficiency.
+   */
+  string topic_name = 1;
+
+  /*
+   * Subscription starting point. This is consumed only as part of the first FetchRequest
+   * when the subscription is set up.
+   */
+  ReplayPreset replay_preset = 2;
+  /*
+   * If replay_preset of CUSTOM is selected, specify the subscription point to start after.
+   * This is consumed only as part of the first FetchRequest when the subscription is set up.
+   */
+  bytes replay_id = 3;
+  /*
+   * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server
+   * of additional processing capacity available on the client side. There is no guarantee of equal number of
+   * FetchResponse messages to be sent back. There is not necessarily a correspondence between
+   * number of requested events in FetchRequest and the number of events returned in subsequent
+   * FetchResponses.
+   */
+  int32 num_requested = 4;
+  // For internal Salesforce use only.
+  string auth_refresh = 5;
+}
+
+/*
+ * Response for the Subscribe streaming RPC method. This returns ConsumerEvent(s).
+ * If there are no events to deliver, the server sends an empty batch fetch response with the latest replay ID. The
+ * empty fetch response is sent within 270 seconds. An empty fetch response provides a periodic keepalive from the
+ * server and the latest replay ID.
+ */
+message FetchResponse {
+  // Received events for subscription for client consumption
+  repeated ConsumerEvent events = 1;
+  // Latest replay ID of a subscription. Enables clients with an updated replay value so that they can keep track
+  // of their last consumed replay. Clients will not have to start a subscription at a very old replay in the case where a resubscribe is necessary.
+  bytes latest_replay_id = 2;
+  // RPC ID used to trace errors.
+  string rpc_id = 3;
+  // Number of remaining events to be delivered to the client for a Subscribe RPC call.
+  int32 pending_num_requested = 4;
+}
+
+/*
+ * Request for the GetSchema RPC method. The schema request is based on the event schema ID.
+ */
+message SchemaRequest {
+  // Schema fingerprint for this event, which is a hash of the schema.
+  string schema_id = 1;
+}
+
+/*
+ * Response for the GetSchema RPC method. This returns the schema ID and schema of an event.
+ */
+message SchemaInfo {
+  // Avro schema in JSON format
+  string schema_json = 1;
+  // Schema fingerprint
+  string schema_id = 2;
+  // RPC ID used to trace errors.
+  string rpc_id = 3;
+}
+
+// Request for the Publish and PublishStream RPC method.
+message PublishRequest {
+  // Topic to publish on
+  string topic_name = 1;
+  // Batch of ProducerEvent(s) to send
+  repeated ProducerEvent events = 2;
+  // For internal Salesforce use only.
+  string auth_refresh = 3;
+}
+
+/*
+ * Response for the Publish and PublishStream RPC methods. This returns
+ * a list of PublishResults for each event that the client attempted to
+ * publish. PublishResult indicates if publish succeeded or not
+ * for each event. It also returns the schema ID that was used to create
+ * the ProducerEvents in the PublishRequest.
+ */
+message PublishResponse {
+  // Publish results
+  repeated PublishResult results = 1;
+  // Schema fingerprint for this event, which is a hash of the schema
+  string schema_id = 2;
+  // RPC ID used to trace errors.
+  string rpc_id = 3;
+}
+
+/*
+ * The Pub/Sub API provides a single interface for publishing and subscribing to platform events, including real-time
+ * event monitoring events, and change data capture events. The Pub/Sub API is a gRPC API that is based on HTTP/2.
+ *
+ * A session token is needed to authenticate. Any of the Salesforce supported
+ * OAuth flows can be used to obtain a session token:
+ * https://help.salesforce.com/articleView?id=sf.remoteaccess_oauth_flows.htm&type=5
+ *
+ * For each RPC, a client needs to pass authentication information
+ * as metadata headers (https://www.grpc.io/docs/guides/concepts/#metadata) with their method call.
+ *
+ * For Salesforce session token authentication, use:
+ *   accesstoken : access token
+ *   instanceurl : Salesforce instance URL
+ *   tenantid : tenant/org id of the client
+ *
+ * StatusException is thrown in case of response failure for any request.
+ */
+service PubSub {
+  /*
+   * Bidirectional streaming RPC to subscribe to a Topic. The subscription is pull-based. A client can request
+   * for more events as it consumes events. This enables a client to handle flow control based on the client's processing speed.
+   *
+   * Typical flow:
+   * 1. Client requests for X number of events via FetchRequest.
+   * 2. Server receives request and delivers events until X events are delivered to the client via one or more FetchResponse messages.
+   * 3. Client consumes the FetchResponse messages as they come.
+   * 4. Client issues new FetchRequest for Y more number of events. This request can
+   *    come before the server has delivered the earlier requested X number of events
+   *    so the client gets a continuous stream of events if any.
+   *
+   * If a client requests more events before the server finishes the last
+   * requested amount, the server appends the new amount to the current amount of
+   * events it still needs to fetch and deliver.
+   *
+   * A client can subscribe at any point in the stream by providing a replay option in the first FetchRequest.
+   * The replay option is honored for the first FetchRequest received from a client. Any subsequent FetchRequests with a
+   * new replay option are ignored. A client needs to call the Subscribe RPC again to restart the subscription
+   * at a new point in the stream.
+   *
+   * The first FetchRequest of the stream identifies the topic to subscribe to.
+   * If any subsequent FetchRequest provides topic_name, it must match what
+   * was provided in the first FetchRequest; otherwise, the RPC returns an error
+   * with INVALID_ARGUMENT status.
+   */
+  rpc Subscribe (stream FetchRequest) returns (stream FetchResponse);
+
+  // Get the event schema for a topic based on a schema ID.
+  rpc GetSchema (SchemaRequest) returns (SchemaInfo);
+
+  /*
+   * Get the topic Information related to the specified topic.
+   */
+  rpc GetTopic (TopicRequest) returns (TopicInfo);
+
+  /*
+   * Send a publish request to synchronously publish events to a topic.
+   */
+  rpc Publish (PublishRequest) returns (PublishResponse);
+
+  /*
+   * Bidirectional Streaming RPC to publish events to the event bus.
+   * PublishRequest contains the batch of events to publish.
+   *
+   * The first PublishRequest of the stream identifies the topic to publish on.
+   * If any subsequent PublishRequest provides topic_name, it must match what
+   * was provided in the first PublishRequest; otherwise, the RPC returns an error
+   * with INVALID_ARGUMENT status.
+   *
+   * The server returns a PublishResponse for each PublishRequest when publish is
+   * complete for the batch. A client does not have to wait for a PublishResponse
+   * before sending a new PublishRequest, i.e. multiple publish batches can be queued
+   * up, which allows for higher publish rate as a client can asynchronously
+   * publish more events while publishes are still in flight on the server side.
+   *
+   * PublishResponse holds a PublishResult for each event published that indicates success
+   * or failure of the publish. A client can then retry the publish as needed before sending
+   * more PublishRequests for new events to publish.
+   *
+   * A client must send a valid publish request with one or more events every 70 seconds to hold on to the stream.
+   * Otherwise, the server closes the stream and notifies the client. Once the client is notified of the stream closure,
+   * it must make a new PublishStream call to resume publishing.
+   */
+  rpc PublishStream (stream PublishRequest) returns (stream PublishResponse);
+}
+
+// Style guide: https://developers.google.com/protocol-buffers/docs/style
\ No newline at end of file
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/com/sforce/eventbus/CamelEventMessage__e.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/com/sforce/eventbus/CamelEventMessage__e.java
new file mode 100644
index 00000000000..befda49c95a
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/com/sforce/eventbus/CamelEventMessage__e.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package com.sforce.eventbus;
+
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.SchemaStore;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.util.Utf8;
+
+@org.apache.avro.specific.AvroGenerated
+public class CamelEventMessage__e extends org.apache.avro.specific.SpecificRecordBase
+        implements org.apache.avro.specific.SpecificRecord {
+    private static final long serialVersionUID = 4603183847267960866L;
+
+    public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse(
+            "{\"type\":\"record\",\"name\":\"CamelEventMessage__e\",\"namespace\":\"com.sforce.eventbus\",\"fields\":[{\"name\":\"CreatedDate\",\"type\":\"long\",\"doc\":\"CreatedDate:DateTime\"},{\"name\":\"CreatedById\",\"type\":\"string\",\"doc\":\"CreatedBy:EntityId\"},{\"name\":\"Message__c\",\"type\":[\"null\",\"string\"],\"doc\":\"Data:Text:00NDS00000mES97\",\"default\":null}]}");
+
+    public static org.apache.avro.Schema getClassSchema() {
+        return SCHEMA$;
+    }
+
+    private static final SpecificData MODEL$ = new SpecificData();
+
+    private static final BinaryMessageEncoder<CamelEventMessage__e> ENCODER = new BinaryMessageEncoder<>(MODEL$, SCHEMA$);
+
+    private static final BinaryMessageDecoder<CamelEventMessage__e> DECODER = new BinaryMessageDecoder<>(MODEL$, SCHEMA$);
+
+    /**
+     * Return the BinaryMessageEncoder instance used by this class.
+     *
+     * @return the message encoder used by this class
+     */
+    public static BinaryMessageEncoder<CamelEventMessage__e> getEncoder() {
+        return ENCODER;
+    }
+
+    /**
+     * Return the BinaryMessageDecoder instance used by this class.
+     *
+     * @return the message decoder used by this class
+     */
+    public static BinaryMessageDecoder<CamelEventMessage__e> getDecoder() {
+        return DECODER;
+    }
+
+    /**
+     * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}.
+     *
+     * @param  resolver a {@link SchemaStore} used to find schemas by fingerprint
+     * @return          a BinaryMessageDecoder instance for this class backed by the given SchemaStore
+     */
+    public static BinaryMessageDecoder<CamelEventMessage__e> createDecoder(SchemaStore resolver) {
+        return new BinaryMessageDecoder<>(MODEL$, SCHEMA$, resolver);
+    }
+
+    /**
+     * Serializes this CamelEventMessage__e to a ByteBuffer.
+     *
+     * @return                     a buffer holding the serialized data for this instance
+     * @throws java.io.IOException if this instance could not be serialized
+     */
+    public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException {
+        return ENCODER.encode(this);
+    }
+
+    /**
+     * Deserializes a CamelEventMessage__e from a ByteBuffer.
+     *
+     * @param  b                   a byte buffer holding serialized data for an instance of this class
+     * @return                     a CamelEventMessage__e instance decoded from the given buffer
+     * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class
+     */
+    public static CamelEventMessage__e fromByteBuffer(
+            java.nio.ByteBuffer b)
+            throws java.io.IOException {
+        return DECODER.decode(b);
+    }
+
+    /** CreatedDate:DateTime */
+    private long CreatedDate;
+    /** CreatedBy:EntityId */
+    private CharSequence CreatedById;
+    /** Data:Text:00NDS00000mES97 */
+    private CharSequence Message__c;
+
+    /**
+     * Default constructor. Note that this does not initialize fields to their default values from the schema. If that
+     * is desired then one should use <code>newBuilder()</code>.
+     */
+    public CamelEventMessage__e() {
+    }
+
+    /**
+     * All-args constructor.
+     *
+     * @param CreatedDate CreatedDate:DateTime
+     * @param CreatedById CreatedBy:EntityId
+     * @param Message__c  Data:Text:00NDS00000mES97
+     */
+    public CamelEventMessage__e(Long CreatedDate, CharSequence CreatedById, CharSequence Message__c) {
+        this.CreatedDate = CreatedDate;
+        this.CreatedById = CreatedById;
+        this.Message__c = Message__c;
+    }
+
+    @Override
+    public SpecificData getSpecificData() {
+        return MODEL$;
+    }
+
+    @Override
+    public org.apache.avro.Schema getSchema() {
+        return SCHEMA$;
+    }
+
+    // Used by DatumWriter.  Applications should not call.
+    @Override
+    public Object get(int field$) {
+        switch (field$) {
+            case 0:
+                return CreatedDate;
+            case 1:
+                return CreatedById;
+            case 2:
+                return Message__c;
+            default:
+                throw new IndexOutOfBoundsException("Invalid index: " + field$);
+        }
+    }
+
+    // Used by DatumReader.  Applications should not call.
+    @Override
+    @SuppressWarnings(value = "unchecked")
+    public void put(int field$, Object value$) {
+        switch (field$) {
+            case 0:
+                CreatedDate = (Long) value$;
+                break;
+            case 1:
+                CreatedById = (CharSequence) value$;
+                break;
+            case 2:
+                Message__c = (CharSequence) value$;
+                break;
+            default:
+                throw new IndexOutOfBoundsException("Invalid index: " + field$);
+        }
+    }
+
+    /**
+     * Gets the value of the 'CreatedDate' field.
+     *
+     * @return CreatedDate:DateTime
+     */
+    public long getCreatedDate() {
+        return CreatedDate;
+    }
+
+    /**
+     * Sets the value of the 'CreatedDate' field. CreatedDate:DateTime
+     *
+     * @param value the value to set.
+     */
+    public void setCreatedDate(long value) {
+        this.CreatedDate = value;
+    }
+
+    /**
+     * Gets the value of the 'CreatedById' field.
+     *
+     * @return CreatedBy:EntityId
+     */
+    public CharSequence getCreatedById() {
+        return CreatedById;
+    }
+
+    /**
+     * Sets the value of the 'CreatedById' field. CreatedBy:EntityId
+     *
+     * @param value the value to set.
+     */
+    public void setCreatedById(CharSequence value) {
+        this.CreatedById = value;
+    }
+
+    /**
+     * Gets the value of the 'Message__c' field.
+     *
+     * @return Data:Text:00NDS00000mES97
+     */
+    public CharSequence getMessageC() {
+        return Message__c;
+    }
+
+    /**
+     * Sets the value of the 'Message__c' field. Data:Text:00NDS00000mES97
+     *
+     * @param value the value to set.
+     */
+    public void setMessageC(CharSequence value) {
+        this.Message__c = value;
+    }
+
+    /**
+     * Creates a new CamelEventMessage__e RecordBuilder.
+     *
+     * @return A new CamelEventMessage__e RecordBuilder
+     */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Creates a new CamelEventMessage__e RecordBuilder by copying an existing Builder.
+     *
+     * @param  other The existing builder to copy.
+     * @return       A new CamelEventMessage__e RecordBuilder
+     */
+    public static Builder newBuilder(Builder other) {
+        if (other == null) {
+            return new Builder();
+        } else {
+            return new Builder(other);
+        }
+    }
+
+    /**
+     * Creates a new CamelEventMessage__e RecordBuilder by copying an existing CamelEventMessage__e instance.
+     *
+     * @param  other The existing instance to copy.
+     * @return       A new CamelEventMessage__e RecordBuilder
+     */
+    public static Builder newBuilder(CamelEventMessage__e other) {
+        if (other == null) {
+            return new Builder();
+        } else {
+            return new Builder(other);
+        }
+    }
+
+    /**
+     * RecordBuilder for CamelEventMessage__e instances.
+     */
+    @org.apache.avro.specific.AvroGenerated
+    public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<CamelEventMessage__e>
+            implements org.apache.avro.data.RecordBuilder<CamelEventMessage__e> {
+
+        /** CreatedDate:DateTime */
+        private long CreatedDate;
+        /** CreatedBy:EntityId */
+        private CharSequence CreatedById;
+        /** Data:Text:00NDS00000mES97 */
+        private CharSequence Message__c;
+
+        /** Creates a new Builder */
+        private Builder() {
+            super(SCHEMA$, MODEL$);
+        }
+
+        /**
+         * Creates a Builder by copying an existing Builder.
+         *
+         * @param other The existing Builder to copy.
+         */
+        private Builder(Builder other) {
+            super(other);
+            if (isValidValue(fields()[0], other.CreatedDate)) {
+                this.CreatedDate = data().deepCopy(fields()[0].schema(), other.CreatedDate);
+                fieldSetFlags()[0] = other.fieldSetFlags()[0];
+            }
+            if (isValidValue(fields()[1], other.CreatedById)) {
+                this.CreatedById = data().deepCopy(fields()[1].schema(), other.CreatedById);
+                fieldSetFlags()[1] = other.fieldSetFlags()[1];
+            }
+            if (isValidValue(fields()[2], other.Message__c)) {
+                this.Message__c = data().deepCopy(fields()[2].schema(), other.Message__c);
+                fieldSetFlags()[2] = other.fieldSetFlags()[2];
+            }
+        }
+
+        /**
+         * Creates a Builder by copying an existing CamelEventMessage__e instance
+         *
+         * @param other The existing instance to copy.
+         */
+        private Builder(CamelEventMessage__e other) {
+            super(SCHEMA$, MODEL$);
+            if (isValidValue(fields()[0], other.CreatedDate)) {
+                this.CreatedDate = data().deepCopy(fields()[0].schema(), other.CreatedDate);
+                fieldSetFlags()[0] = true;
+            }
+            if (isValidValue(fields()[1], other.CreatedById)) {
+                this.CreatedById = data().deepCopy(fields()[1].schema(), other.CreatedById);
+                fieldSetFlags()[1] = true;
+            }
+            if (isValidValue(fields()[2], other.Message__c)) {
+                this.Message__c = data().deepCopy(fields()[2].schema(), other.Message__c);
+                fieldSetFlags()[2] = true;
+            }
+        }
+
+        /**
+         * Gets the value of the 'CreatedDate' field. CreatedDate:DateTime
+         *
+         * @return The value.
+         */
+        public long getCreatedDate() {
+            return CreatedDate;
+        }
+
+        /**
+         * Sets the value of the 'CreatedDate' field. CreatedDate:DateTime
+         *
+         * @param  value The value of 'CreatedDate'.
+         * @return       This builder.
+         */
+        public Builder setCreatedDate(long value) {
+            validate(fields()[0], value);
+            this.CreatedDate = value;
+            fieldSetFlags()[0] = true;
+            return this;
+        }
+
+        /**
+         * Checks whether the 'CreatedDate' field has been set. CreatedDate:DateTime
+         *
+         * @return True if the 'CreatedDate' field has been set, false otherwise.
+         */
+        public boolean hasCreatedDate() {
+            return fieldSetFlags()[0];
+        }
+
+        /**
+         * Clears the value of the 'CreatedDate' field. CreatedDate:DateTime
+         *
+         * @return This builder.
+         */
+        public Builder clearCreatedDate() {
+            fieldSetFlags()[0] = false;
+            return this;
+        }
+
+        /**
+         * Gets the value of the 'CreatedById' field. CreatedBy:EntityId
+         *
+         * @return The value.
+         */
+        public CharSequence getCreatedById() {
+            return CreatedById;
+        }
+
+        /**
+         * Sets the value of the 'CreatedById' field. CreatedBy:EntityId
+         *
+         * @param  value The value of 'CreatedById'.
+         * @return       This builder.
+         */
+        public Builder setCreatedById(CharSequence value) {
+            validate(fields()[1], value);
+            this.CreatedById = value;
+            fieldSetFlags()[1] = true;
+            return this;
+        }
+
+        /**
+         * Checks whether the 'CreatedById' field has been set. CreatedBy:EntityId
+         *
+         * @return True if the 'CreatedById' field has been set, false otherwise.
+         */
+        public boolean hasCreatedById() {
+            return fieldSetFlags()[1];
+        }
+
+        /**
+         * Clears the value of the 'CreatedById' field. CreatedBy:EntityId
+         *
+         * @return This builder.
+         */
+        public Builder clearCreatedById() {
+            CreatedById = null;
+            fieldSetFlags()[1] = false;
+            return this;
+        }
+
+        /**
+         * Gets the value of the 'Message__c' field. Data:Text:00NDS00000mES97
+         *
+         * @return The value.
+         */
+        public CharSequence getMessageC() {
+            return Message__c;
+        }
+
+        /**
+         * Sets the value of the 'Message__c' field. Data:Text:00NDS00000mES97
+         *
+         * @param  value The value of 'Message__c'.
+         * @return       This builder.
+         */
+        public Builder setMessageC(CharSequence value) {
+            validate(fields()[2], value);
+            this.Message__c = value;
+            fieldSetFlags()[2] = true;
+            return this;
+        }
+
+        /**
+         * Checks whether the 'Message__c' field has been set. Data:Text:00NDS00000mES97
+         *
+         * @return True if the 'Message__c' field has been set, false otherwise.
+         */
+        public boolean hasMessageC() {
+            return fieldSetFlags()[2];
+        }
+
+        /**
+         * Clears the value of the 'Message__c' field. Data:Text:00NDS00000mES97
+         *
+         * @return This builder.
+         */
+        public Builder clearMessageC() {
+            Message__c = null;
+            fieldSetFlags()[2] = false;
+            return this;
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public CamelEventMessage__e build() {
+            try {
+                CamelEventMessage__e record = new CamelEventMessage__e();
+                record.CreatedDate = fieldSetFlags()[0] ? this.CreatedDate : (Long) defaultValue(fields()[0]);
+                record.CreatedById = fieldSetFlags()[1] ? this.CreatedById : (CharSequence) defaultValue(fields()[1]);
+                record.Message__c = fieldSetFlags()[2] ? this.Message__c : (CharSequence) defaultValue(fields()[2]);
+                return record;
+            } catch (org.apache.avro.AvroMissingFieldException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new org.apache.avro.AvroRuntimeException(e);
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static final org.apache.avro.io.DatumWriter<CamelEventMessage__e> WRITER$
+            = (org.apache.avro.io.DatumWriter<CamelEventMessage__e>) MODEL$.createDatumWriter(SCHEMA$);
+
+    @Override
+    public void writeExternal(java.io.ObjectOutput out)
+            throws java.io.IOException {
+        WRITER$.write(this, SpecificData.getEncoder(out));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static final org.apache.avro.io.DatumReader<CamelEventMessage__e> READER$
+            = (org.apache.avro.io.DatumReader<CamelEventMessage__e>) MODEL$.createDatumReader(SCHEMA$);
+
+    @Override
+    public void readExternal(java.io.ObjectInput in)
+            throws java.io.IOException {
+        READER$.read(this, SpecificData.getDecoder(in));
+    }
+
+    @Override
+    protected boolean hasCustomCoders() {
+        return true;
+    }
+
+    @Override
+    public void customEncode(org.apache.avro.io.Encoder out)
+            throws java.io.IOException {
+        out.writeLong(this.CreatedDate);
+
+        out.writeString(this.CreatedById);
+
+        if (this.Message__c == null) {
+            out.writeIndex(0);
+            out.writeNull();
+        } else {
+            out.writeIndex(1);
+            out.writeString(this.Message__c);
+        }
+
+    }
+
+    @Override
+    public void customDecode(org.apache.avro.io.ResolvingDecoder in)
+            throws java.io.IOException {
+        org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff();
+        if (fieldOrder == null) {
+            this.CreatedDate = in.readLong();
+
+            this.CreatedById = in.readString(this.CreatedById instanceof Utf8 ? (Utf8) this.CreatedById : null);
+
+            if (in.readIndex() != 1) {
+                in.readNull();
+                this.Message__c = null;
+            } else {
+                this.Message__c = in.readString(this.Message__c instanceof Utf8 ? (Utf8) this.Message__c : null);
+            }
+
+        } else {
+            for (int i = 0; i < 3; i++) {
+                switch (fieldOrder[i].pos()) {
+                    case 0:
+                        this.CreatedDate = in.readLong();
+                        break;
+
+                    case 1:
+                        this.CreatedById = in.readString(this.CreatedById instanceof Utf8 ? (Utf8) this.CreatedById : null);
+                        break;
+
+                    case 2:
+                        if (in.readIndex() != 1) {
+                            in.readNull();
+                            this.Message__c = null;
+                        } else {
+                            this.Message__c = in.readString(this.Message__c instanceof Utf8 ? (Utf8) this.Message__c : null);
+                        }
+                        break;
+
+                    default:
+                        throw new java.io.IOException("Corrupt ResolvingDecoder.");
+                }
+            }
+        }
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiIntegrationTest.java
new file mode 100644
index 00000000000..efb72e945b8
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiIntegrationTest.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.protobuf.ByteString;
+import com.salesforce.eventbus.protobuf.ProducerEvent;
+import com.sforce.eventbus.CamelEventMessage__e;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.salesforce.internal.client.PubSubApiClient;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Tag("standalone")
+public class PubSubApiIntegrationTest extends AbstractSalesforceTestBase {
+
+    private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+    final Schema camelEventSchema = new Schema.Parser().parse(
+            """
+                    {
+                        "type": "record",
+                        "name": "CamelEventMessage__e",
+                        "namespace": "com.sforce.eventbus",
+                        "fields": [
+                            {
+                                "name": "CreatedDate",
+                                "type": "long",
+                                "doc": "CreatedDate:DateTime"
+                            },
+                            {
+                                "name": "CreatedById",
+                                "type": "string",
+                                "doc": "CreatedBy:EntityId"
+                            },
+                            {
+                                "name": "Message__c",
+                                "type": [
+                                    "null",
+                                    "string"
+                                ],
+                                "doc": "Data:Text:00NDS00000mES97",
+                                "default": null
+                            }
+                        ]
+                    }
+                    """);
+
+    final Schema camelEvent2Schema = new Schema.Parser().parse(
+            """
+                    {
+                        "type": "record",
+                        "name": "CamelEventNote__e",
+                        "namespace": "com.sforce.eventbus",
+                        "fields": [
+                            {
+                                "name": "CreatedDate",
+                                "type": "long",
+                                "doc": "CreatedDate:DateTime"
+                            },
+                            {
+                                "name": "CreatedById",
+                                "type": "string",
+                                "doc": "CreatedBy:EntityId"
+                            },
+                            {
+                                "name": "Note__c",
+                                "type": [
+                                    "null",
+                                    "string"
+                                ],
+                                "doc": "Data:Text:00NDS00000mZSIr",
+                                "default": null
+                            }
+                        ]
+                    }
+                    """);
+
+    private GenericRecord record() {
+        return new GenericRecordBuilder(camelEventSchema)
+                .set("Message__c", "hello world")
+                .set("CreatedDate", System.currentTimeMillis() / 1000)
+                .set("CreatedById", "123")
+                .build();
+    }
+
+    @Test
+    public void receiveEventsOverTime() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer:timer1?period=1000&repeatCount=3")
+                        .setBody(exchange -> List.of(record()))
+                        .to("salesforce:pubSubPublish:/event/CamelEventMessage__e");
+            }
+        });
+
+        MockEndpoint mock = getMockEndpoint("mock:CamelTestTopic");
+        mock.expectedMessageCount(3);
+        mock.assertIsSatisfied(10000);
+    }
+
+    @Test
+    public void canPublishAndReceivePojoEvents() throws InterruptedException {
+        PojoEvent record = new PojoEvent();
+        record.setMessage__c("hello world");
+        record.setCreatedDate(System.currentTimeMillis() / 1000);
+        record.setCreatedById("123");
+
+        CamelEventMessage__e expectedRecord = new CamelEventMessage__e();
+        expectedRecord.setMessageC(record.getMessage__c());
+        expectedRecord.setCreatedDate(record.getCreatedDate());
+        expectedRecord.setCreatedById(record.getCreatedById());
+
+        MockEndpoint mock = getMockEndpoint("mock:CamelTestTopic");
+        mock.expectedBodiesReceived(expectedRecord);
+
+        template.requestBody("direct:publishCamelEventMessage", List.of(record));
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void canReceiveJsonEvent() throws InterruptedException {
+        PojoEvent record = new PojoEvent();
+        record.setMessage__c("hello world");
+        record.setCreatedDate(System.currentTimeMillis() / 1000);
+        record.setCreatedById("123");
+
+        MockEndpoint mock = getMockEndpoint("mock:CamelTestTopicJson");
+        mock.expectedMessageCount(1);
+
+        template.requestBody("direct:publishCamelEventMessage", List.of(record));
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void canReceivePojoEvent() throws InterruptedException {
+        PubSubPojoEvent record = new PubSubPojoEvent();
+        record.setMessage__c("hello world");
+        record.setCreatedDate(System.currentTimeMillis() / 1000);
+        record.setCreatedById("123");
+
+        MockEndpoint mock = getMockEndpoint("mock:CamelTestTopicPojo");
+        mock.expectedBodiesReceived(record);
+
+        template.requestBody("direct:publishCamelEventMessage", List.of(record));
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void canPublishJsonEvents() throws InterruptedException {
+        String record = """
+                {
+                    "Message__c": "hello world",
+                    "CreatedDate": 123,
+                    "CreatedById": "123"
+                }
+                """;
+
+        CamelEventMessage__e expectedRecord = new CamelEventMessage__e();
+        expectedRecord.setMessageC("hello world");
+        expectedRecord.setCreatedDate(123);
+        expectedRecord.setCreatedById("123");
+
+        MockEndpoint mock = getMockEndpoint("mock:CamelTestTopic");
+        mock.expectedBodiesReceived(expectedRecord);
+
+        template.requestBody("direct:publishCamelEventMessage", List.of(record));
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void canPublishProducerEvent() throws InterruptedException, IOException {
+        CamelEventMessage__e record = new CamelEventMessage__e();
+        record.setMessageC("hello world");
+        record.setCreatedDate(123);
+        record.setCreatedById("123");
+
+        PubSubApiClient client = null;
+        try {
+            client = new PubSubApiClient(
+                    component.getSession(), new SalesforceLoginConfig(),
+                    "api.pubsub.salesforce.com", 7443, 0, 0);
+            client.start();
+
+            byte[] bytes;
+            ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(buffer, null);
+            final SpecificDatumWriter<CamelEventMessage__e> writer = new SpecificDatumWriter<>(CamelEventMessage__e.class);
+            writer.write(record, encoder);
+            bytes = buffer.toByteArray();
+
+            ProducerEvent.newBuilder()
+                    .setSchemaId(client.getTopicInfo("/event/CamelEventMessage__e").getSchemaId())
+                    .setPayload(ByteString.copyFrom(bytes))
+                    .build();
+
+            MockEndpoint mock = getMockEndpoint("mock:CamelTestTopic");
+            mock.expectedBodiesReceived(record);
+
+            template.requestBody("direct:publishCamelEventMessage", List.of(record));
+
+            mock.assertIsSatisfied();
+        } finally {
+            if (client != null) {
+                client.stop();
+            }
+        }
+    }
+
+    @Test
+    public void canPublishAndReceiveGenericEvents() throws InterruptedException {
+        CamelEventMessage__e expectedRecord = new CamelEventMessage__e();
+        expectedRecord.setMessageC("hello world");
+        expectedRecord.setCreatedDate(System.currentTimeMillis() / 1000);
+        expectedRecord.setCreatedById("123");
+
+        final GenericRecord record = new GenericRecordBuilder(camelEventSchema)
+                .set("Message__c", "hello world")
+                .set("CreatedDate", System.currentTimeMillis() / 1000)
+                .set("CreatedById", "123")
+                .build();
+
+        final GenericRecord record2 = new GenericRecordBuilder(camelEvent2Schema)
+                .set("Note__c", "hello world2")
+                .set("CreatedDate", record.get("CreatedDate"))
+                .set("CreatedById", "123")
+                .build();
+
+        MockEndpoint mock = getMockEndpoint("mock:CamelTestTopic");
+        mock.expectedBodiesReceived(expectedRecord);
+
+        MockEndpoint mock2 = getMockEndpoint("mock:CamelTestTopicEventNote");
+        mock2.expectedBodiesReceived(record2);
+
+        template.requestBody("direct:publishCamelEventMessage", List.of(record));
+        template.requestBody("direct:publishCamelEventNote", List.of(record2));
+
+        mock.assertIsSatisfied();
+        mock2.assertIsSatisfied();
+    }
+
+    @Test
+    public void canPublishAndReceiveSpecificEvents() throws InterruptedException {
+        CamelEventMessage__e record = new CamelEventMessage__e();
+        record.setMessageC("hello world");
+        record.setCreatedDate(System.currentTimeMillis() / 1000);
+        record.setCreatedById("123");
+
+        MockEndpoint mock = getMockEndpoint("mock:CamelTestTopic");
+        mock.expectedMessageCount(1);
+        mock.whenAnyExchangeReceived(exchange -> {
+            LOG.debug(exchange.getIn().getBody().toString());
+            LOG.debug(exchange.getIn().getBody().getClass().getName());
+        });
+
+        template.requestBody("direct:publishCamelEventMessage", List.of(record));
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void canPublishAndReceiveABatchOfEvents() throws InterruptedException {
+        final GenericRecord record = new GenericRecordBuilder(camelEventSchema)
+                .set("Message__c", "hello world")
+                .set("CreatedDate", System.currentTimeMillis() / 1000)
+                .set("CreatedById", "123")
+                .build();
+
+        MockEndpoint mock = getMockEndpoint("mock:CamelTestTopic");
+        mock.expectedMessageCount(3);
+
+        template.requestBody("direct:publishCamelEventMessage", List.of(record, record, record));
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void canSubscribeWithReplayId() throws Exception {
+        final GenericRecord record = new GenericRecordBuilder(camelEventSchema)
+                .set("Message__c", "hello world")
+                .set("CreatedDate", System.currentTimeMillis() / 1000)
+                .set("CreatedById", "123")
+                .build();
+
+        MockEndpoint mock = getMockEndpoint("mock:CamelTestTopic");
+        mock.expectedMessageCount(1);
+        AtomicReference<String> replayId = new AtomicReference<>("");
+        mock.whenAnyExchangeReceived(exchange -> {
+            final String rId = exchange.getIn().getHeader("CamelSalesforcePubSubReplayId", String.class);
+            replayId.set(rId);
+        });
+
+        // publish an initial event
+        template.requestBody("direct:publishCamelEventMessage", List.of(record));
+        mock.assertIsSatisfied();
+
+        // store the replayId in properties
+        LOG.debug("replayId: {}", replayId);
+        context.getPropertiesComponent().addOverrideProperty("pubSubReplayId", replayId.get());
+
+        // start a new route with the replayId
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("salesforce:pubSubSubscribe:/event/CamelEventNote__e" +
+                     "?replayPreset=CUSTOM" +
+                     "&pubSubReplayId={{pubSubReplayId}}")
+                        .routeId("r.subscriberWithReplayId")
+                        .autoStartup(false)
+                        .to("mock:SubscriberWithReplayId");
+            }
+        });
+
+        mock.reset();
+        mock.expectedMessageCount(1);
+
+        template.requestBody("direct:publishCamelEventMessage", List.of(record));
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder doCreateRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from("direct:publishCamelEventMessage")
+                        .to("salesforce:pubSubPublish:/event/CamelEventMessage__e");
+
+                from("direct:publishCamelEventNote")
+                        .to("salesforce:pubSubPublish:/event/CamelEventNote__e");
+
+                from("salesforce:pubSubSubscribe:/event/CamelEventMessage__e?pubSubBatchSize=10")
+                        .routeId("org.apache.camel.component.salesforce.sub1")
+                        .log(LoggingLevel.DEBUG, "${body}")
+                        .to("mock:CamelTestTopic");
+
+                from("salesforce:pubSubSubscribe:/event/CamelEventMessage__e" +
+                     "?pubSubBatchSize=10" +
+                     "&pubSubDeserializeType=JSON")
+                        .routeId("org.apache.camel.component.salesforce.sub3")
+                        .log(LoggingLevel.DEBUG, "${body}")
+                        .to("mock:CamelTestTopicJson");
+
+                from("salesforce:pubSubSubscribe:/event/CamelEventMessage__e" +
+                     "?pubSubBatchSize=10" +
+                     "&pubSubDeserializeType=POJO" +
+                     "&pubSubPojoClass=org.apache.camel.component.salesforce.PubSubPojoEvent")
+                        .routeId("org.apache.camel.component.salesforce.sub4")
+                        .log(LoggingLevel.DEBUG, "${body}")
+                        .to("mock:CamelTestTopicPojo");
+
+                from("salesforce:pubSubSubscribe:/event/CamelEventNote__e?pubSubBatchSize=10")
+                        .routeId("org.apache.camel.component.salesforce.sub2")
+                        .log(LoggingLevel.DEBUG, "${body}")
+                        .to("mock:CamelTestTopicEventNote");
+            }
+        };
+    }
+
+    public static class PojoEvent {
+        private String Message__c;
+        private long CreatedDate;
+        private String CreatedById;
+
+        @Override
+        public String toString() {
+            return "PojoEvent{" +
+                   "Message__c='" + Message__c + '\'' +
+                   ", CreatedDate=" + CreatedDate +
+                   ", CreatedById='" + CreatedById + '\'' +
+                   '}';
+        }
+
+        public String getMessage__c() {
+            return Message__c;
+        }
+
+        public void setMessage__c(String message__c) {
+            this.Message__c = message__c;
+        }
+
+        public long getCreatedDate() {
+            return CreatedDate;
+        }
+
+        public void setCreatedDate(long createdDate) {
+            this.CreatedDate = createdDate;
+        }
+
+        public String getCreatedById() {
+            return CreatedById;
+        }
+
+        public void setCreatedById(String createdById) {
+            this.CreatedById = createdById;
+        }
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
new file mode 100644
index 00000000000..2ab1ba5d145
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubApiTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+import com.salesforce.eventbus.protobuf.ReplayPreset;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.apache.camel.component.salesforce.internal.client.PubSubApiClient;
+import org.apache.camel.component.salesforce.internal.pubsub.AuthErrorPubSubServer;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class PubSubApiTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PubSubApiTest.class);
+
+    @Test
+    public void shouldAuthenticateAndSubscribeAfterAuthError() throws IOException {
+        final SalesforceSession session = mock(SalesforceSession.class);
+        when(session.getAccessToken()).thenReturn("faketoken");
+        when(session.getInstanceUrl()).thenReturn("https://myinstance");
+        when(session.getOrgId()).thenReturn("00D123123123");
+
+        final PubSubApiConsumer consumer = mock(PubSubApiConsumer.class);
+        when(consumer.getTopic()).thenReturn("/event/FakeTopic");
+        when(consumer.getBatchSize()).thenReturn(100);
+
+        int port = getPort();
+        LOG.debug("Starting server on port {}", port);
+        final Server grpcServer = ServerBuilder.forPort(port)
+                .addService(new AuthErrorPubSubServer())
+                .build();
+        grpcServer.start();
+
+        PubSubApiClient client = new PubSubApiClient(
+                session, new SalesforceLoginConfig(), "localhost",
+                port, 1000, 10000);
+        client.setUsePlainTextConnection(true);
+        client.start();
+        client.subscribe(consumer, ReplayPreset.LATEST, null);
+
+        verify(session, timeout(5000)).attemptLoginUntilSuccessful(anyLong(), anyLong());
+    }
+
+    private int getPort() throws IOException {
+        try (ServerSocket serverSocket = new ServerSocket(0)) {
+            return serverSocket.getLocalPort();
+        }
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubPojoEvent.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubPojoEvent.java
new file mode 100644
index 00000000000..a9ee7efb381
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/PubSubPojoEvent.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce;
+
+import java.util.Objects;
+
+public class PubSubPojoEvent {
+    private String Message__c;
+    private long CreatedDate;
+    private String CreatedById;
+
+    @Override
+    public String toString() {
+        return "PubSubPojoEvent{" +
+               "Message__c='" + Message__c + '\'' +
+               ", CreatedDate=" + CreatedDate +
+               ", CreatedById='" + CreatedById + '\'' +
+               '}';
+    }
+
+    public String getMessage__c() {
+        return Message__c;
+    }
+
+    public void setMessage__c(String message__c) {
+        this.Message__c = message__c;
+    }
+
+    public long getCreatedDate() {
+        return CreatedDate;
+    }
+
+    public void setCreatedDate(long createdDate) {
+        this.CreatedDate = createdDate;
+    }
+
+    public String getCreatedById() {
+        return CreatedById;
+    }
+
+    public void setCreatedById(String createdById) {
+        this.CreatedById = createdById;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (!(o instanceof PubSubPojoEvent))
+            return false;
+        PubSubPojoEvent that = (PubSubPojoEvent) o;
+        return CreatedDate == that.CreatedDate && Objects.equals(Message__c, that.Message__c)
+                && CreatedById.equals(that.CreatedById);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(Message__c, CreatedDate, CreatedById);
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RawPayloadTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RawPayloadTest.java
index e85fe0a8603..84b724d9a97 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RawPayloadTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/RawPayloadTest.java
@@ -103,7 +103,9 @@ public class RawPayloadTest extends AbstractSalesforceTestBase {
             public MockResponse dispatch(RecordedRequest recordedRequest) throws InterruptedException {
                 if (recordedRequest.getPath().equals(OAUTH2_TOKEN_PATH)) {
                     return new MockResponse().setResponseCode(200)
-                            .setBody("{ \"access_token\": \"mock_token\", \"instance_url\": \"" + loginUrl + "\"}");
+                            .setBody(
+                                    "{ \"access_token\": \"mock_token\", \"id\": \"https://login.salesforce.com/id/00D4100000xxxxxxxx/0054100000xxxxxxxx\", \"instance_url\": \""
+                                     + loginUrl + "\"}");
                 } else {
                     return new MockResponse().setResponseCode(200)
                             .setHeader(HttpHeader.CONTENT_TYPE.toString(),
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiConsumerTest.java
similarity index 94%
rename from components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java
rename to components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiConsumerTest.java
index caa456d0eef..a240acb431a 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/SalesforceConsumerTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/StreamingApiConsumerTest.java
@@ -51,7 +51,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 @MockitoSettings(strictness = Strictness.LENIENT)
-public class SalesforceConsumerTest {
+public class StreamingApiConsumerTest {
 
     public static class AccountUpdates {
         @JsonProperty("Id")
@@ -156,7 +156,7 @@ public class SalesforceConsumerTest {
         when(endpoint.getTopicName()).thenReturn("AccountUpdates");
         configuration.setSObjectClass(AccountUpdates.class.getName());
 
-        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+        final StreamingApiConsumer consumer = new StreamingApiConsumer(endpoint, processor, NOT_USED);
         consumer.determineSObjectClass();
 
         consumer.processMessage(mock(ClientSessionChannel.class), pushTopicMessage);
@@ -194,7 +194,7 @@ public class SalesforceConsumerTest {
         message.put("data", data);
         message.put("channel", "/event/TestEvent__e");
 
-        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+        final StreamingApiConsumer consumer = new StreamingApiConsumer(endpoint, processor, NOT_USED);
 
         consumer.processMessage(mock(ClientSessionChannel.class), message);
 
@@ -217,7 +217,7 @@ public class SalesforceConsumerTest {
     public void shouldProcessPushTopicMessages() throws Exception {
         when(endpoint.getTopicName()).thenReturn("AccountUpdates");
 
-        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+        final StreamingApiConsumer consumer = new StreamingApiConsumer(endpoint, processor, NOT_USED);
 
         consumer.processMessage(mock(ClientSessionChannel.class), pushTopicMessage);
 
@@ -239,7 +239,7 @@ public class SalesforceConsumerTest {
         when(endpoint.getTopicName()).thenReturn("AccountUpdates");
         configuration.setRawPayload(true);
 
-        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+        final StreamingApiConsumer consumer = new StreamingApiConsumer(endpoint, processor, NOT_USED);
 
         consumer.processMessage(mock(ClientSessionChannel.class), pushTopicMessage);
 
@@ -272,7 +272,7 @@ public class SalesforceConsumerTest {
         message.put("data", data);
         message.put("channel", "/event/TestEvent__e");
 
-        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+        final StreamingApiConsumer consumer = new StreamingApiConsumer(endpoint, processor, NOT_USED);
 
         consumer.processMessage(mock(ClientSessionChannel.class), message);
 
@@ -293,7 +293,7 @@ public class SalesforceConsumerTest {
     public void shouldProcessChangeEvents() throws Exception {
         when(endpoint.getTopicName()).thenReturn("/data/AccountChangeEvent");
 
-        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+        final StreamingApiConsumer consumer = new StreamingApiConsumer(endpoint, processor, NOT_USED);
 
         consumer.processMessage(mock(ClientSessionChannel.class), mockChangeEvent);
 
@@ -325,7 +325,7 @@ public class SalesforceConsumerTest {
         when(endpoint.getTopicName()).thenReturn("/data/AccountChangeEvent");
         when(mockChangeEventMap.get("replayId")).thenReturn(null);
 
-        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+        final StreamingApiConsumer consumer = new StreamingApiConsumer(endpoint, processor, NOT_USED);
 
         consumer.processMessage(mock(ClientSessionChannel.class), mockChangeEvent);
 
@@ -337,7 +337,7 @@ public class SalesforceConsumerTest {
         when(endpoint.getTopicName()).thenReturn("/data/AccountChangeEvent");
         configuration.setRawPayload(true);
 
-        final SalesforceConsumer consumer = new SalesforceConsumer(endpoint, processor, NOT_USED);
+        final StreamingApiConsumer consumer = new StreamingApiConsumer(endpoint, processor, NOT_USED);
 
         consumer.processMessage(mock(ClientSessionChannel.class), mockChangeEvent);
 
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/SalesforceSessionTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/SalesforceSessionTest.java
index 3dd364a9414..1f4bdba0156 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/SalesforceSessionTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/SalesforceSessionTest.java
@@ -100,7 +100,15 @@ public class SalesforceSessionTest {
         when(request.send()).thenReturn(response);
 
         when(response.getStatus()).thenReturn(HttpStatus.OK_200);
-        when(response.getContentAsString()).thenReturn("{\"instance_url\": \"https://eu11.salesforce.com\"}");
+        when(response.getContentAsString()).thenReturn("{\n" +
+                                                       "  \"access_token\": \"00D4100000xxxxx!faketoken\",\n" +
+                                                       "  \"instance_url\": \"https://eu11.salesforce.com\",\n" +
+                                                       "  \"id\": \"https://login.salesforce.com/id/00D4100000xxxxxxxx/0054100000xxxxxxxx\",\n"
+                                                       +
+                                                       "  \"token_type\": \"Bearer\",\n" +
+                                                       "  \"issued_at\": \"1674496911543\",\n" +
+                                                       "  \"signature\": \"/ai5/F+LXEocLQZKdO4uwLblDszPUibL/Dfcn82R9VI=\"\n" +
+                                                       "}");
 
         session.login(null);
         return session;
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/AuthErrorPubSubServer.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/AuthErrorPubSubServer.java
new file mode 100644
index 00000000000..fedab67e7ed
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/pubsub/AuthErrorPubSubServer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.pubsub;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import com.salesforce.eventbus.protobuf.FetchRequest;
+import com.salesforce.eventbus.protobuf.FetchResponse;
+import com.salesforce.eventbus.protobuf.PubSubGrpc;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+
+import static org.apache.camel.component.salesforce.internal.client.PubSubApiClient.PUBSUB_ERROR_AUTH_ERROR;
+
+// Stub implementation that throws an auth error
+public class AuthErrorPubSubServer extends PubSubGrpc.PubSubImplBase {
+
+    public int subscribeCalls = 0;
+
+    @Override
+    public StreamObserver<FetchRequest> subscribe(StreamObserver<FetchResponse> client) {
+
+        subscribeCalls = subscribeCalls + 1;
+
+        return new StreamObserver<>() {
+            @Override
+            public void onNext(FetchRequest request) {
+                if (subscribeCalls == 1) {
+                    TimerTask task = new TimerTask() {
+                        public void run() {
+                            StatusRuntimeException e = new StatusRuntimeException(Status.UNAUTHENTICATED, new Metadata());
+                            e.getTrailers().put(Metadata.Key.of("error-code", Metadata.ASCII_STRING_MARSHALLER),
+                                    PUBSUB_ERROR_AUTH_ERROR);
+                            client.onError(e);
+                        }
+                    };
+                    Timer timer = new Timer("Timer");
+                    long delay = 1000L;
+                    timer.schedule(task, delay);
+                }
+            }
+
+            @Override
+            public void onError(Throwable t) {
+
+            }
+
+            @Override
+            public void onCompleted() {
+
+            }
+        };
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
index 8df70f171f1..2e8e90f8b0c 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperIntegrationTest.java
@@ -24,9 +24,9 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.CamelException;
 import org.apache.camel.component.salesforce.AuthenticationType;
 import org.apache.camel.component.salesforce.SalesforceComponent;
-import org.apache.camel.component.salesforce.SalesforceConsumer;
 import org.apache.camel.component.salesforce.SalesforceEndpoint;
 import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
+import org.apache.camel.component.salesforce.StreamingApiConsumer;
 import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.cometd.bayeux.Message;
@@ -60,7 +60,7 @@ public class SubscriptionHelperIntegrationTest {
     final StubServer server;
     final SubscriptionHelper subscription;
 
-    SalesforceConsumer toUnsubscribe;
+    StreamingApiConsumer toUnsubscribe;
 
     static class MessageArgumentMatcher implements ArgumentMatcher<Message> {
 
@@ -90,27 +90,36 @@ public class SubscriptionHelperIntegrationTest {
         LoggerFactory.getLogger(SubscriptionHelperIntegrationTest.class).info("Port for wireshark to filter: {}",
                 server.port());
         final String instanceUrl = "http://localhost:" + server.port();
-        server.replyTo("POST", "/services/oauth2/token",
-                "{\"instance_url\":\"" + instanceUrl + "\",\"access_token\":\"token\"}");
+        server.replyTo(
+                "POST", "/services/oauth2/token",
+                "{\n" +
+                                                  "    \"instance_url\": \"" + instanceUrl + "\",\n" +
+                                                  "    \"access_token\": \"00D4100000xxxxx!faketoken\",\n" +
+                                                  "    \"id\": \"https://login.salesforce.com/id/00D4100000xxxxxxxx/0054100000xxxxxxxx\"\n"
+                                                  +
+                                                  "}");
+
         server.replyTo("GET", "/services/oauth2/revoke?token=token", 200);
 
-        server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/handshake", "[\n"
-                                                                                                     + "  {\n"
-                                                                                                     + "    \"ext\": {\n"
-                                                                                                     + "      \"replay\": true,\n"
-                                                                                                     + "      \"payload.format\": true\n"
-                                                                                                     + "    },\n"
-                                                                                                     + "    \"minimumVersion\": \"1.0\",\n"
-                                                                                                     + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
-                                                                                                     + "    \"supportedConnectionTypes\": [\n"
-                                                                                                     + "      \"long-polling\"\n"
-                                                                                                     + "    ],\n"
-                                                                                                     + "    \"channel\": \"/meta/handshake\",\n"
-                                                                                                     + "    \"id\": \"$id\",\n"
-                                                                                                     + "    \"version\": \"1.0\",\n"
-                                                                                                     + "    \"successful\": true\n"
-                                                                                                     + "  }\n"
-                                                                                                     + "]");
+        server.replyTo(
+                "POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/handshake",
+                "[\n"
+                                                                                              + "  {\n"
+                                                                                              + "    \"ext\": {\n"
+                                                                                              + "      \"replay\": true,\n"
+                                                                                              + "      \"payload.format\": true\n"
+                                                                                              + "    },\n"
+                                                                                              + "    \"minimumVersion\": \"1.0\",\n"
+                                                                                              + "    \"clientId\": \"5ra4927ikfky6cb12juthkpofeu8\",\n"
+                                                                                              + "    \"supportedConnectionTypes\": [\n"
+                                                                                              + "      \"long-polling\"\n"
+                                                                                              + "    ],\n"
+                                                                                              + "    \"channel\": \"/meta/handshake\",\n"
+                                                                                              + "    \"id\": \"$id\",\n"
+                                                                                              + "    \"version\": \"1.0\",\n"
+                                                                                              + "    \"successful\": true\n"
+                                                                                              + "  }\n"
+                                                                                              + "]");
 
         server.replyTo("POST", "/cometd/" + SalesforceEndpointConfig.DEFAULT_VERSION + "/connect",
                 req -> req.contains("\"timeout\":0"), "[\n"
@@ -194,8 +203,8 @@ public class SubscriptionHelperIntegrationTest {
         // handshake and connect
         subscription.start();
 
-        final SalesforceConsumer consumer
-                = toUnsubscribe = mock(SalesforceConsumer.class, "shouldResubscribeOnConnectionFailures:consumer");
+        final StreamingApiConsumer consumer
+                = toUnsubscribe = mock(StreamingApiConsumer.class, "shouldResubscribeOnConnectionFailures:consumer");
 
         final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnConnectionFailures:endpoint");
 
@@ -289,8 +298,8 @@ public class SubscriptionHelperIntegrationTest {
         // handshake and connect
         subscription.start();
 
-        final SalesforceConsumer consumer
-                = toUnsubscribe = mock(SalesforceConsumer.class, "shouldResubscribeOnHelperRestart:consumer");
+        final StreamingApiConsumer consumer
+                = toUnsubscribe = mock(StreamingApiConsumer.class, "shouldResubscribeOnHelperRestart:consumer");
 
         final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class, "shouldResubscribeOnHelperRestart:endpoint");
 
diff --git a/components/camel-salesforce/camel-salesforce-maven-plugin/README.md b/components/camel-salesforce/camel-salesforce-maven-plugin/README.md
index b7a69bde13a..30de0b5c3e4 100644
--- a/components/camel-salesforce/camel-salesforce-maven-plugin/README.md
+++ b/components/camel-salesforce/camel-salesforce-maven-plugin/README.md
@@ -1,8 +1,14 @@
 # Maven plugin for camel-salesforce component #
 
-This plugin generates DTOs for the [Camel Salesforce Component](https://github.com/apache/camel/tree/main/components/camel-salesforce/camel-salesforce-component). 
+This plugin generates DTOs for use with the [Camel Salesforce Component](https://github.com/apache/camel/tree/main/components/camel-salesforce/camel-salesforce-component).  
 
 ## Usage ##
+              
+This plugin provides three maven goals:
+                         
+* The `generate` goal generates DTOs for use with the REST API.
+* The `generatePubSub` goal generates Apache Avro `SpecificRecord` subclasses for use with the PubSub API.
+* The `schema` goal generates JSON Schemas that correspond to objects used with the REST API.
 
 The plugin configuration has the following properties.
 
@@ -14,6 +20,7 @@ The plugin configuration has the following properties.
 * version - Salesforce Rest API version, defaults to 25.0
 * outputDirectory - Directory where to place generated DTOs, defaults to ${project.build.directory}/generated-sources/camel-salesforce
 * includes - List of SObject types to include
+* topics - List of topics to include, .e.g., `/event/BatchApexErrorEvent`. This property only applies to the `generatePubSub` goal.
 * excludes - List of SObject types to exclude
 * includePattern - Java RegEx for SObject types to include
 * excludePattern - Java RegEx for SObject types to exclude
diff --git a/components/camel-salesforce/camel-salesforce-maven-plugin/src/main/java/org/apache/camel/maven/AbstractSalesforceMojo.java b/components/camel-salesforce/camel-salesforce-maven-plugin/src/main/java/org/apache/camel/maven/AbstractSalesforceMojo.java
index 35144c33821..7c9b8d9fabd 100644
--- a/components/camel-salesforce/camel-salesforce-maven-plugin/src/main/java/org/apache/camel/maven/AbstractSalesforceMojo.java
+++ b/components/camel-salesforce/camel-salesforce-maven-plugin/src/main/java/org/apache/camel/maven/AbstractSalesforceMojo.java
@@ -155,7 +155,7 @@ public abstract class AbstractSalesforceMojo extends AbstractMojo {
             setup();
             execution.execute();
         } catch (Exception e) {
-            throw new MojoExecutionException(e.getMessage());
+            throw new MojoExecutionException(e.getMessage(), e);
         }
 
     }
diff --git a/components/camel-salesforce/camel-salesforce-maven-plugin/src/main/java/org/apache/camel/maven/GeneratePubSubMojo.java b/components/camel-salesforce/camel-salesforce-maven-plugin/src/main/java/org/apache/camel/maven/GeneratePubSubMojo.java
new file mode 100644
index 00000000000..330f14d983a
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-maven-plugin/src/main/java/org/apache/camel/maven/GeneratePubSubMojo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.maven;
+
+import java.io.File;
+
+import org.apache.camel.component.salesforce.codegen.AbstractSalesforceExecution;
+import org.apache.camel.component.salesforce.codegen.GeneratePubSubExecution;
+import org.apache.maven.plugins.annotations.LifecyclePhase;
+import org.apache.maven.plugins.annotations.Mojo;
+import org.apache.maven.plugins.annotations.Parameter;
+
+@Mojo(name = "generatePubSub", requiresProject = false, defaultPhase = LifecyclePhase.GENERATE_SOURCES)
+public class GeneratePubSubMojo extends AbstractSalesforceMojo {
+
+    @Parameter(property = "camelSalesforce.pubSubHost", required = true, defaultValue = "api.pubsub.salesforce.com")
+    String pubSubHost;
+
+    @Parameter(property = "camelSalesforce.pubSubPort", required = true, defaultValue = "7443")
+    Integer pubSubPort;
+
+    @Parameter(property = "camelSalesforce.pubSubOutputDirectory",
+               defaultValue = "${project.build.directory}/generated-sources/camel-salesforce")
+    File outputDirectory;
+
+    @Parameter
+    String[] topics;
+
+    GeneratePubSubExecution execution = new GeneratePubSubExecution();
+
+    @Override
+    protected AbstractSalesforceExecution getSalesforceExecution() {
+        return execution;
+    }
+
+    @Override
+    protected void setup() {
+        super.setup();
+        execution.setTopics(topics);
+        execution.setPubSubHost(pubSubHost);
+        execution.setPubSubPort(pubSubPort);
+        execution.setOutputDirectory(outputDirectory);
+        execution.setup();
+    }
+}
diff --git a/components/camel-salesforce/camel-salesforce-maven-plugin/src/test/java/org/apache/camel/maven/AbstractSalesforceMojoIntegrationTest.java b/components/camel-salesforce/camel-salesforce-maven-plugin/src/test/java/org/apache/camel/maven/AbstractSalesforceMojoIntegrationTest.java
index 2983f6755e1..371d42f7e90 100644
--- a/components/camel-salesforce/camel-salesforce-maven-plugin/src/test/java/org/apache/camel/maven/AbstractSalesforceMojoIntegrationTest.java
+++ b/components/camel-salesforce/camel-salesforce-maven-plugin/src/test/java/org/apache/camel/maven/AbstractSalesforceMojoIntegrationTest.java
@@ -27,7 +27,6 @@ import java.util.Properties;
 
 import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
 import org.apache.camel.component.salesforce.codegen.AbstractSalesforceExecution;
-import org.apache.camel.component.salesforce.internal.client.RestClient;
 import org.apache.maven.plugin.MojoExecutionException;
 import org.apache.maven.plugin.MojoFailureException;
 import org.junit.jupiter.api.Test;
@@ -51,10 +50,10 @@ public class AbstractSalesforceMojoIntegrationTest {
             protected AbstractSalesforceExecution getSalesforceExecution() {
                 return new AbstractSalesforceExecution() {
                     @Override
-                    protected void executeWithClient(RestClient client) {
-                        assertThat(client).isNotNull();
+                    protected void executeWithClient() {
+                        assertThat(getRestClient()).isNotNull();
 
-                        client.getGlobalObjects(NO_HEADERS, (response, headers, exception) -> {
+                        getRestClient().getGlobalObjects(NO_HEADERS, (response, headers, exception) -> {
                             assertThat(exception).isNull();
                         });
                     }
diff --git a/components/camel-salesforce/camel-salesforce-maven-plugin/src/test/java/org/apache/camel/maven/GeneratePubSubMojoIntegrationTest.java b/components/camel-salesforce/camel-salesforce-maven-plugin/src/test/java/org/apache/camel/maven/GeneratePubSubMojoIntegrationTest.java
new file mode 100644
index 00000000000..5649ce5cbac
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-maven-plugin/src/test/java/org/apache/camel/maven/GeneratePubSubMojoIntegrationTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.maven;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import javax.tools.JavaFileObject;
+
+import com.google.testing.compile.Compilation;
+import com.google.testing.compile.Compiler;
+import com.google.testing.compile.JavaFileObjects;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static org.apache.camel.maven.AbstractSalesforceMojoIntegrationTest.setup;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class GeneratePubSubMojoIntegrationTest {
+
+    private static final String TEST_LOGIN_PROPERTIES = "../test-salesforce-login.properties";
+
+    @TempDir
+    public Path temp;
+
+    @Test
+    public void testExecute() throws Exception {
+        final GeneratePubSubMojo mojo = createMojo();
+
+        // generate code
+        mojo.execute();
+
+        // validate generated code check that it was generated
+        final Path packagePath = temp.resolve("com").resolve("sforce").resolve("eventbus");
+        assertThat(packagePath).as("Package directory was not created").exists();
+
+        // test that the generated sources can be compiled
+        try (Stream<Path> list = Files.list(packagePath)) {
+            final List<JavaFileObject> sources = list.map(p -> {
+                try {
+                    return JavaFileObjects.forResource(p.toUri().toURL());
+                } catch (final MalformedURLException e) {
+                    throw new IllegalArgumentException(e);
+                }
+            }).collect(Collectors.toList());
+            final Compilation compilation = Compiler.javac().compile(sources);
+            assertThat(compilation.status()).isEqualTo(Compilation.Status.SUCCESS);
+        }
+    }
+
+    GeneratePubSubMojo createMojo() throws IOException {
+        final GeneratePubSubMojo mojo = new GeneratePubSubMojo();
+
+        // set login properties
+        setup(mojo);
+
+        // set additional properties specific to this Mojo
+        try (final InputStream stream = new FileInputStream(TEST_LOGIN_PROPERTIES)) {
+            final Properties properties = new Properties();
+            properties.load(stream);
+            mojo.pubSubHost = properties.getProperty("salesforce.pubsub.host");
+            mojo.pubSubPort = Integer.valueOf(properties.getProperty("salesforce.pubsub.port"));
+            mojo.topics = new String[] { "/event/BatchApexErrorEvent" };
+        }
+        mojo.outputDirectory = temp.toFile();
+        return mojo;
+    }
+}
diff --git a/components/camel-salesforce/it/resources/salesforce/objects/CamelEventMessage__e.object b/components/camel-salesforce/it/resources/salesforce/objects/CamelEventMessage__e.object
new file mode 100644
index 00000000000..95f6918cd94
--- /dev/null
+++ b/components/camel-salesforce/it/resources/salesforce/objects/CamelEventMessage__e.object
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<CustomObject xmlns="http://soap.sforce.com/2006/04/metadata">
+    <deploymentStatus>Deployed</deploymentStatus>
+    <eventType>HighVolume</eventType>
+    <fields>
+        <fullName>Message__c</fullName>
+        <externalId>false</externalId>
+        <isFilteringDisabled>false</isFilteringDisabled>
+        <isNameField>false</isNameField>
+        <isSortingDisabled>false</isSortingDisabled>
+        <label>Message</label>
+        <length>255</length>
+        <required>false</required>
+        <type>Text</type>
+        <unique>false</unique>
+    </fields>
+    <label>Camel Event Message</label>
+    <pluralLabel>Camel Event Messages</pluralLabel>
+    <publishBehavior>PublishAfterCommit</publishBehavior>
+</CustomObject>
diff --git a/components/camel-salesforce/it/resources/salesforce/objects/CamelEventNote__e.object b/components/camel-salesforce/it/resources/salesforce/objects/CamelEventNote__e.object
new file mode 100644
index 00000000000..7597cec8e23
--- /dev/null
+++ b/components/camel-salesforce/it/resources/salesforce/objects/CamelEventNote__e.object
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<CustomObject xmlns="http://soap.sforce.com/2006/04/metadata">
+    <deploymentStatus>Deployed</deploymentStatus>
+    <eventType>HighVolume</eventType>
+    <fields>
+        <fullName>Note__c</fullName>
+        <externalId>false</externalId>
+        <isFilteringDisabled>false</isFilteringDisabled>
+        <isNameField>false</isNameField>
+        <isSortingDisabled>false</isSortingDisabled>
+        <label>Note</label>
+        <length>255</length>
+        <required>false</required>
+        <type>Text</type>
+        <unique>false</unique>
+    </fields>
+    <label>Camel Event Note</label>
+    <pluralLabel>Camel Events Notes</pluralLabel>
+    <publishBehavior>PublishAfterCommit</publishBehavior>
+</CustomObject>
diff --git a/components/camel-salesforce/it/resources/salesforce/package.xml b/components/camel-salesforce/it/resources/salesforce/package.xml
index de75d4c8160..abf7309126a 100644
--- a/components/camel-salesforce/it/resources/salesforce/package.xml
+++ b/components/camel-salesforce/it/resources/salesforce/package.xml
@@ -85,8 +85,8 @@
         <name>ConnectedApp</name>
     </types>
     <types>
-        <members>ChangeEvents</members>
-        <name>PlatformEventChannel</name>
+        <members>ChangeEvents_AccountChangeEvent</members>
+        <name>PlatformEventChannelMember</name>
     </types>
-    <version>45.0</version>
+    <version>55.0</version>
 </Package>
diff --git a/components/camel-salesforce/it/resources/salesforce/platformEventChannelMembers/ChangeEvents_AccountChangeEvent.platformEventChannelMember b/components/camel-salesforce/it/resources/salesforce/platformEventChannelMembers/ChangeEvents_AccountChangeEvent.platformEventChannelMember
new file mode 100644
index 00000000000..fb2459804e7
--- /dev/null
+++ b/components/camel-salesforce/it/resources/salesforce/platformEventChannelMembers/ChangeEvents_AccountChangeEvent.platformEventChannelMember
@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<PlatformEventChannelMember xmlns="http://soap.sforce.com/2006/04/metadata">
+    <eventChannel>ChangeEvents</eventChannel>
+    <selectedEntity>AccountChangeEvent</selectedEntity>
+</PlatformEventChannelMember>
diff --git a/components/camel-salesforce/it/resources/salesforce/platformEventChannels/ChangeEvents.platformEventChannel b/components/camel-salesforce/it/resources/salesforce/platformEventChannels/ChangeEvents.platformEventChannel
deleted file mode 100644
index e309403c8de..00000000000
--- a/components/camel-salesforce/it/resources/salesforce/platformEventChannels/ChangeEvents.platformEventChannel
+++ /dev/null
@@ -1,24 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<PlatformEventChannel xmlns="http://soap.sforce.com/2006/04/metadata">
-    <channelMembers>
-        <selectedEntity>AccountChangeEvent</selectedEntity>
-    </channelMembers>
-    <channelType>data</channelType>
-    <label>ChangeEvents</label>
-</PlatformEventChannel>
diff --git a/components/camel-salesforce/it/resources/salesforce/tabs/Invoice__c.tab b/components/camel-salesforce/it/resources/salesforce/tabs/Invoice__c.tab
index 99890936abe..9c82dab8d8a 100644
--- a/components/camel-salesforce/it/resources/salesforce/tabs/Invoice__c.tab
+++ b/components/camel-salesforce/it/resources/salesforce/tabs/Invoice__c.tab
@@ -17,6 +17,5 @@
 -->
 <CustomTab xmlns="http://soap.sforce.com/2006/04/metadata">
     <customObject>true</customObject>
-    <mobileReady>false</mobileReady>
     <motif>Custom18: Form</motif>
 </CustomTab>
diff --git a/components/camel-salesforce/it/resources/salesforce/tabs/Merchandise__c.tab b/components/camel-salesforce/it/resources/salesforce/tabs/Merchandise__c.tab
index 14f3543a96f..1f15667a97e 100644
--- a/components/camel-salesforce/it/resources/salesforce/tabs/Merchandise__c.tab
+++ b/components/camel-salesforce/it/resources/salesforce/tabs/Merchandise__c.tab
@@ -17,6 +17,5 @@
 -->
 <CustomTab xmlns="http://soap.sforce.com/2006/04/metadata">
     <customObject>true</customObject>
-    <mobileReady>false</mobileReady>
     <motif>Custom21: Computer</motif>
 </CustomTab>
diff --git a/components/camel-salesforce/pom.xml b/components/camel-salesforce/pom.xml
index 234ec01b234..26a60ac67ef 100644
--- a/components/camel-salesforce/pom.xml
+++ b/components/camel-salesforce/pom.xml
@@ -40,6 +40,14 @@
 
     <properties>
         <salesforce.component.root>${project.basedir}</salesforce.component.root>
+
+        <!-- Exclude camel-salesforce code until checkstyle supports Java 17 -->
+        <sourcecheckExcludes>
+            **/*.java
+        </sourcecheckExcludes>
+        <sourcecheckExcludesComma>
+            ${sourcecheckExcludes},
+        </sourcecheckExcludesComma>
     </properties>
 
 </project>