You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by "stn1slv (via GitHub)" <gi...@apache.org> on 2024/03/27 16:01:16 UTC

[PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

stn1slv opened a new pull request, #13630:
URL: https://github.com/apache/camel/pull/13630

   # Description
   
   <!--
   - Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   -->
   
   # Target
   
   - [x] I checked that the commit is targeting the correct branch (note that Camel 3 uses `camel-3.x`, whereas Camel 4 uses the `main` branch)
   
   # Tracking
   - [x] If this is a large change, bug fix, or code improvement, I checked there is a [JIRA issue](https://issues.apache.org/jira/browse/CAMEL) filed for the change (usually before you start working on it).
   
   <!--
   # *Note*: trivial changes like, typos, minor documentation fixes and other small items do not require a JIRA issue. In this case your pull request should address just this issue, without pulling in other changes.
   -->
   
   # Apache Camel coding standards and style
   
   - [x] I checked that each commit in the pull request has a meaningful subject line and body.
   
   <!--
   If you're unsure, you can format the pull request title like `[CAMEL-XXX] Fixes bug in camel-file component`, where you replace `CAMEL-XXX` with the appropriate JIRA issue.
   -->
   
   - [x] I have run `mvn clean install -DskipTests` locally and I have committed all auto-generated changes
   
   <!--
   You can run the aforementioned command in your module so that the build auto-formats your code. This will also be verified as part of the checks and your PR may be rejected if if there are uncommited changes after running `mvn clean install -DskipTests`.
   
   You can learn more about the contribution guidelines at https://github.com/apache/camel/blob/main/CONTRIBUTING.md
   -->
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "stn1slv (via GitHub)" <gi...@apache.org>.
stn1slv commented on code in PR #13630:
URL: https://github.com/apache/camel/pull/13630#discussion_r1542457420


##########
components/camel-google/camel-google-pubsub-lite/.gitignore:
##########
@@ -0,0 +1,7 @@
+.idea

Review Comment:
   Fixed in 8bfe10f



##########
components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteEndpoint.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsublite;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.*;
+import org.apache.camel.component.google.pubsublite.serializer.DefaultGooglePubsubSerializer;
+import org.apache.camel.component.google.pubsublite.serializer.GooglePubsubSerializer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Send and receive messages to/from Google Cloud Platform PubSub Lite Service.
+ * <p/>
+ * Built on top of the Google Cloud Pub/Sub Lite libraries.
+ */
+@UriEndpoint(firstVersion = "4.5.0", scheme = "google-pubsub-lite", title = "Google Pubsub Lite",
+             syntax = "google-pubsub-lite:projectId:location:destinationName",
+             category = { Category.CLOUD, Category.MESSAGING },
+             headersClass = GooglePubsubLiteConstants.class)
+public class GooglePubsubLiteEndpoint extends DefaultEndpoint {
+
+    private Logger log;
+
+    @UriPath(label = "common", description = "The Google Cloud PubSub Lite Project Id")
+    @Metadata(required = true)
+    private Long projectId;
+
+    @UriPath(label = "common", description = "The Google Cloud PubSub Lite location")
+    @Metadata(required = true)
+    private String location;
+
+    @UriPath(label = "common",
+             description = "The Destination Name. For the consumer this will be the subscription name, while for the producer this will be the topic name.")
+    @Metadata(required = true)
+    private String destinationName;
+
+    @UriParam(label = "common",
+              description = "The Service account key that can be used as credentials for the PubSub publisher/subscriber. It can be loaded by default from "
+                            + " classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems.")
+    private String serviceAccountKey;
+
+    @UriParam(name = "loggerId", description = "Logger ID to use when a match to the parent route required")
+    private String loggerId;
+
+    @UriParam(label = "consumer", name = "concurrentConsumers",
+              description = "The number of parallel streams consuming from the subscription",
+              defaultValue = "1")
+    private Integer concurrentConsumers = 1;
+
+    @UriParam(label = "consumer", name = "maxMessagesPerPoll",
+              description = "The max number of messages to receive from the server in a single API call", defaultValue = "1")
+    private Integer maxMessagesPerPoll = 1;
+
+    @UriParam(label = "consumer", defaultValue = "AUTO", enums = "AUTO,NONE",
+              description = "AUTO = exchange gets ack'ed/nack'ed on completion. NONE = downstream process has to ack/nack explicitly")
+    private GooglePubsubLiteConstants.AckMode ackMode = GooglePubsubLiteConstants.AckMode.AUTO;
+
+    @UriParam(label = "consumer", name = "maxAckExtensionPeriod",
+              description = "Set the maximum period a message ack deadline will be extended. Value in seconds",
+              defaultValue = "3600")
+    private int maxAckExtensionPeriod = 3600;
+
+    @UriParam(description = "Pub/Sub endpoint to use. Required when using message ordering, and ensures that messages are received in order even when multiple publishers are used",
+              label = "producer,advanced")
+    private String pubsubEndpoint;
+
+    @UriParam(name = "serializer",
+              description = "A custom GooglePubsubLiteSerializer to use for serializing message payloads in the producer",
+              label = "producer,advanced")
+    @Metadata(autowired = true)
+    private GooglePubsubSerializer serializer;
+
+    public GooglePubsubLiteEndpoint(String uri, Component component) {
+        super(uri, component);
+
+        if (!(component instanceof GooglePubsubLiteComponent)) {

Review Comment:
   Fixed in 8bfe10f



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "stn1slv (via GitHub)" <gi...@apache.org>.
stn1slv commented on PR #13630:
URL: https://github.com/apache/camel/pull/13630#issuecomment-2025342006

   > Ah its in the JIRA. If this information is missing in the docs, then it would be good to add there.
   
   I'll add it to the docs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13630:
URL: https://github.com/apache/camel/pull/13630#issuecomment-2023139998

   :star2: Thank you for your contribution to the Apache Camel project! :star2: 
   
   :robot: CI automation will test this PR automatically.
   
   :camel: Apache Camel Committers, please review the following items:
   
   * First-time contributors **require MANUAL approval** for the GitHub Actions to run
   
   * You can use the command `/component-test (camel-)component-name1 (camel-)component-name2..` to request a test from the test bot.
   
   * You can label PRs using `build-all`, `build-dependents`, `skip-tests` and `test-dependents` to fine-tune the checks executed by this PR.
   
   * Build and test logs are available in the Summary page. **Only** [Apache Camel committers](https://camel.apache.org/community/team/#committers) have access to the summary. 
   
   * :warning: Be careful when sharing logs. Review their contents before sharing them publicly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus merged PR #13630:
URL: https://github.com/apache/camel/pull/13630


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "stn1slv (via GitHub)" <gi...@apache.org>.
stn1slv commented on code in PR #13630:
URL: https://github.com/apache/camel/pull/13630#discussion_r1542456791


##########
components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteComponent.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsublite;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.cloudpubsub.*;

Review Comment:
   Fixed in [8bfe10f](https://github.com/apache/camel/pull/13630/commits/8bfe10f92a9fd68d9d9fa417c5e906aca3b20d7a)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "stn1slv (via GitHub)" <gi...@apache.org>.
stn1slv commented on PR #13630:
URL: https://github.com/apache/camel/pull/13630#issuecomment-2024606515

   @oscerd, thanks for the review and feedback. 
   Addressed findings in 8bfe10f


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "stn1slv (via GitHub)" <gi...@apache.org>.
stn1slv commented on code in PR #13630:
URL: https://github.com/apache/camel/pull/13630#discussion_r1542458053


##########
components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteComponent.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsublite;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.cloudpubsub.*;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import org.apache.camel.Endpoint;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the component that manages {@link GooglePubsubLiteEndpoint}.
+ */
+@Component("google-pubsub-lite")
+public class GooglePubsubLiteComponent extends DefaultComponent {
+    private static final Logger LOG = LoggerFactory.getLogger(GooglePubsubLiteComponent.class);
+
+    @Metadata(label = "common",
+              description = "The Service account key that can be used as credentials for the PubSub Lite publisher/subscriber. It can be loaded by default from "
+                            + " classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems.")
+    private String serviceAccountKey;
+
+    @Metadata(
+              label = "producer",
+              description = "Maximum number of producers to cache. This could be increased if you have producers for lots of different topics.")
+    private int publisherCacheSize = 100;
+
+    @Metadata(
+              label = "producer",
+              description = "How many milliseconds should each producer stay alive in the cache.")
+    private int publisherCacheTimeout = 180000;
+
+    @Metadata(
+              label = "consumer",
+              description = "How many milliseconds should each producer stay alive in the cache. " +
+                            "Must be greater than the allowed size of the largest message (1 MiB).")
+    private long consumerBytesOutstanding = 10 * 1024 * 1024;
+
+    @Metadata(
+              label = "consumer",
+              description = "The number of messages that may be outstanding to the client. Must be >0.")
+    private long consumerMessagesOutstanding = 1000;
+
+    @Metadata(
+              label = "advanced",
+              description = "How many milliseconds should a producer be allowed to terminate.")
+    private int publisherTerminationTimeout = 60000;
+
+    private RemovalListener<String, Publisher> removalListener = removal -> {
+        Publisher publisher = removal.getValue();
+        if (publisher == null) {

Review Comment:
   Fixed in [8bfe10f](https://github.com/apache/camel/pull/13630/commits/8bfe10f92a9fd68d9d9fa417c5e906aca3b20d7a)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus commented on PR #13630:
URL: https://github.com/apache/camel/pull/13630#issuecomment-2025304523

   Can you tell more what is the difference between this `list` vs the regular pubsub in google ?
   And is so that would be good to have in the docs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "stn1slv (via GitHub)" <gi...@apache.org>.
stn1slv commented on PR #13630:
URL: https://github.com/apache/camel/pull/13630#issuecomment-2044641075

    The doc for consumerBytesOutstanding fixed in https://github.com/apache/camel/pull/13660


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "stn1slv (via GitHub)" <gi...@apache.org>.
stn1slv commented on code in PR #13630:
URL: https://github.com/apache/camel/pull/13630#discussion_r1542458053


##########
components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteComponent.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsublite;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.cloudpubsub.*;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import org.apache.camel.Endpoint;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the component that manages {@link GooglePubsubLiteEndpoint}.
+ */
+@Component("google-pubsub-lite")
+public class GooglePubsubLiteComponent extends DefaultComponent {
+    private static final Logger LOG = LoggerFactory.getLogger(GooglePubsubLiteComponent.class);
+
+    @Metadata(label = "common",
+              description = "The Service account key that can be used as credentials for the PubSub Lite publisher/subscriber. It can be loaded by default from "
+                            + " classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems.")
+    private String serviceAccountKey;
+
+    @Metadata(
+              label = "producer",
+              description = "Maximum number of producers to cache. This could be increased if you have producers for lots of different topics.")
+    private int publisherCacheSize = 100;
+
+    @Metadata(
+              label = "producer",
+              description = "How many milliseconds should each producer stay alive in the cache.")
+    private int publisherCacheTimeout = 180000;
+
+    @Metadata(
+              label = "consumer",
+              description = "How many milliseconds should each producer stay alive in the cache. " +
+                            "Must be greater than the allowed size of the largest message (1 MiB).")
+    private long consumerBytesOutstanding = 10 * 1024 * 1024;
+
+    @Metadata(
+              label = "consumer",
+              description = "The number of messages that may be outstanding to the client. Must be >0.")
+    private long consumerMessagesOutstanding = 1000;
+
+    @Metadata(
+              label = "advanced",
+              description = "How many milliseconds should a producer be allowed to terminate.")
+    private int publisherTerminationTimeout = 60000;
+
+    private RemovalListener<String, Publisher> removalListener = removal -> {
+        Publisher publisher = removal.getValue();
+        if (publisher == null) {

Review Comment:
   Fixed in 8bfe10f



##########
components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteComponent.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsublite;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.cloudpubsub.*;

Review Comment:
   Fixed in 8bfe10f



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "stn1slv (via GitHub)" <gi...@apache.org>.
stn1slv commented on code in PR #13630:
URL: https://github.com/apache/camel/pull/13630#discussion_r1542457420


##########
components/camel-google/camel-google-pubsub-lite/.gitignore:
##########
@@ -0,0 +1,7 @@
+.idea

Review Comment:
   Fixed in [8bfe10f](https://github.com/apache/camel/pull/13630/commits/8bfe10f92a9fd68d9d9fa417c5e906aca3b20d7a)



##########
components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteProducer.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsublite;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.api.core.ApiFuture;
+import com.google.cloud.pubsublite.cloudpubsub.Publisher;
+import com.google.common.base.Strings;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.google.pubsublite.GooglePubsubLiteConstants.ATTRIBUTES;
+import static org.apache.camel.component.google.pubsublite.GooglePubsubLiteConstants.ORDERING_KEY;
+import static org.apache.camel.component.google.pubsublite.GooglePubsubLiteConstants.RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX;
+
+/**
+ * Generic PubSub Lite Producer
+ */
+public class GooglePubsubLiteProducer extends DefaultProducer {
+
+    public Logger logger;
+
+    public GooglePubsubLiteProducer(GooglePubsubLiteEndpoint endpoint) {
+        super(endpoint);
+
+        String loggerId = endpoint.getLoggerId();
+
+        if (Strings.isNullOrEmpty(loggerId)) {
+            loggerId = this.getClass().getName();
+        }
+
+        logger = LoggerFactory.getLogger(loggerId);
+    }
+
+    /**
+     * The incoming message is expected to be either - a List of Exchanges (aggregated) - an Exchange
+     */
+    @Override
+    public void process(Exchange exchange) throws Exception {
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("uploader thread/id: {} / {}. api call completed.", Thread.currentThread().getId(),
+                    exchange.getExchangeId());
+        }
+
+        if (exchange.getIn().getBody() instanceof List) {
+            boolean groupedExchanges = false;
+            for (Object body : exchange.getIn().getBody(List.class)) {
+                if (body instanceof Exchange) {
+                    send((Exchange) body);
+                    groupedExchanges = true;
+                }
+            }
+            if (!groupedExchanges) {
+                send(exchange);
+            }
+        } else {
+            send(exchange);
+        }
+    }
+
+    private void send(Exchange exchange) throws Exception {
+
+        GooglePubsubLiteEndpoint endpoint = (GooglePubsubLiteEndpoint) getEndpoint();
+        String topicName = String.format("projects/%s/locations/%s/topics/%s", endpoint.getProjectId(), endpoint.getLocation(),
+                endpoint.getDestinationName());
+
+        Publisher publisher = endpoint.getComponent().getPublisher(topicName, endpoint);
+
+        Object body = exchange.getIn().getBody();
+        ByteString byteString;
+
+        if (body instanceof String) {
+            byteString = ByteString.copyFromUtf8((String) body);
+        } else if (body instanceof byte[]) {
+            byteString = ByteString.copyFrom((byte[]) body);
+        } else {
+            byteString = ByteString.copyFrom(endpoint.getSerializer().serialize(body));
+        }
+
+        PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder().setData(byteString);
+        Map<String, String> attributes = exchange.getIn().getHeader(ATTRIBUTES, Map.class);
+        if (attributes != null) {
+            for (Map.Entry<String, String> attribute : attributes.entrySet()) {
+                if (!attribute.getKey().startsWith(RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX)) {
+                    messageBuilder.putAttributes(attribute.getKey(), attribute.getValue());
+                }
+            }
+        }
+        String orderingKey = exchange.getIn().getHeader(ORDERING_KEY, String.class);
+        if (orderingKey != null) {
+            messageBuilder.setOrderingKey(orderingKey);
+        }
+
+        PubsubMessage message = messageBuilder.build();
+
+        ApiFuture<String> messageIdFuture = publisher.publish(message);
+        exchange.getIn().setHeader(GooglePubsubLiteConstants.MESSAGE_ID, messageIdFuture.get());

Review Comment:
   Fixed in [8bfe10f](https://github.com/apache/camel/pull/13630/commits/8bfe10f92a9fd68d9d9fa417c5e906aca3b20d7a)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "stn1slv (via GitHub)" <gi...@apache.org>.
stn1slv commented on PR #13630:
URL: https://github.com/apache/camel/pull/13630#issuecomment-2024884192

   @davsclaus, apologies for such a basic query: I attempted to build the docs but didn't observe any new changes. I'm unclear whether I need to manually copy files over. There might be something I overlooked or missed out on.
   
   ```bash
   $ cd docs
   $ mvn install
   Running `/Users/Stanislav_Deviatov/src/github/camel/mvnw`...
   [INFO] Scanning for projects...
   [INFO] Loaded 7 groupId for remote repository atlassian
   [INFO] ------------------------------------------------------------------------
   [INFO] Detecting the operating system and CPU architecture
   [INFO] ------------------------------------------------------------------------
   [INFO] os.detected.name: osx
   [INFO] os.detected.arch: x86_64
   [INFO] os.detected.bitness: 64
   [INFO] os.detected.version: 14.4
   [INFO] os.detected.version.major: 14
   [INFO] os.detected.version.minor: 4
   [INFO] os.detected.classifier: osx-x86_64
   [INFO]
   [INFO] -----------------------< org.apache.camel:docs >------------------------
   [INFO] Building Camel :: Docs 4.5.0-SNAPSHOT
   [INFO]   from pom.xml
   [INFO] --------------------------------[ pom ]---------------------------------
   [INFO]
   [INFO] --- license:4.3:format (license-format) @ docs ---
   [INFO] Updating license headers...
   [INFO]
   [INFO] --- formatter:2.23.0:format (format) @ docs ---
   [INFO]
   [INFO] --- impsort:1.9.0:sort (sort-imports) @ docs ---
   [INFO] Processed 0 files in 00:00.004 (Already Sorted: 0, Needed Sorting: 0)
   [INFO]
   [INFO] --- resources:3.3.1:copy-resources (copy-maven-plugin-doc) @ docs ---
   [INFO] Copying 1 resource from ../tooling/maven/camel-maven-plugin/src/main/docs to user-manual/modules/ROOT/pages
   [INFO] Copying 1 resource from ../catalog/camel-report-maven-plugin/src/main/docs to user-manual/modules/ROOT/pages
   [INFO]
   [INFO] --- frontend:1.15.0:install-node-and-yarn (install node and yarn) @ docs ---
   [INFO] Node v16.17.1 is already installed.
   [INFO] Yarn Berry 3.2.3 is installed.
   [INFO]
   [INFO] --- frontend:1.15.0:yarn (yarn-gulp) @ docs ---
   [INFO] Skipping execution.
   [INFO]
   [INFO] --- camel-package:4.5.0-SNAPSHOT:xref-check (xref-check) @ docs ---
   [INFO]
   [INFO] --- flatten:1.6.0:flatten (default-cli) @ docs ---
   [INFO] Generating flattened POM of project org.apache.camel:docs:pom:4.5.0-SNAPSHOT...
   [INFO]
   [INFO] --- install:3.1.1:install (default-install) @ docs ---
   [INFO] Installing /Users/Stanislav_Deviatov/src/github/camel/docs/target/docs-4.5.0-SNAPSHOT.pom to /Users/Stanislav_Deviatov/.m2/repository/org/apache/camel/docs/4.5.0-SNAPSHOT/docs-4.5.0-SNAPSHOT.pom
   [INFO] ------------------------------------------------------------------------
   [INFO] BUILD SUCCESS
   [INFO] ------------------------------------------------------------------------
   [INFO] Total time:  14.482 s
   [INFO] Finished at: 2024-03-28T11:33:05+01:00
   [INFO] ------------------------------------------------------------------------
   $ git status
   On branch CAMEL-20625
   Your branch is up to date with 'origin/CAMEL-20625'.
   
   nothing to commit, working tree clean
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "stn1slv (via GitHub)" <gi...@apache.org>.
stn1slv commented on code in PR #13630:
URL: https://github.com/apache/camel/pull/13630#discussion_r1542457517


##########
components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteProducer.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsublite;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.api.core.ApiFuture;
+import com.google.cloud.pubsublite.cloudpubsub.Publisher;
+import com.google.common.base.Strings;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.google.pubsublite.GooglePubsubLiteConstants.ATTRIBUTES;
+import static org.apache.camel.component.google.pubsublite.GooglePubsubLiteConstants.ORDERING_KEY;
+import static org.apache.camel.component.google.pubsublite.GooglePubsubLiteConstants.RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX;
+
+/**
+ * Generic PubSub Lite Producer
+ */
+public class GooglePubsubLiteProducer extends DefaultProducer {
+
+    public Logger logger;
+
+    public GooglePubsubLiteProducer(GooglePubsubLiteEndpoint endpoint) {
+        super(endpoint);
+
+        String loggerId = endpoint.getLoggerId();
+
+        if (Strings.isNullOrEmpty(loggerId)) {
+            loggerId = this.getClass().getName();
+        }
+
+        logger = LoggerFactory.getLogger(loggerId);
+    }
+
+    /**
+     * The incoming message is expected to be either - a List of Exchanges (aggregated) - an Exchange
+     */
+    @Override
+    public void process(Exchange exchange) throws Exception {
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("uploader thread/id: {} / {}. api call completed.", Thread.currentThread().getId(),
+                    exchange.getExchangeId());
+        }
+
+        if (exchange.getIn().getBody() instanceof List) {
+            boolean groupedExchanges = false;
+            for (Object body : exchange.getIn().getBody(List.class)) {
+                if (body instanceof Exchange) {
+                    send((Exchange) body);
+                    groupedExchanges = true;
+                }
+            }
+            if (!groupedExchanges) {
+                send(exchange);
+            }
+        } else {
+            send(exchange);
+        }
+    }
+
+    private void send(Exchange exchange) throws Exception {
+
+        GooglePubsubLiteEndpoint endpoint = (GooglePubsubLiteEndpoint) getEndpoint();
+        String topicName = String.format("projects/%s/locations/%s/topics/%s", endpoint.getProjectId(), endpoint.getLocation(),
+                endpoint.getDestinationName());
+
+        Publisher publisher = endpoint.getComponent().getPublisher(topicName, endpoint);
+
+        Object body = exchange.getIn().getBody();
+        ByteString byteString;
+
+        if (body instanceof String) {
+            byteString = ByteString.copyFromUtf8((String) body);
+        } else if (body instanceof byte[]) {
+            byteString = ByteString.copyFrom((byte[]) body);
+        } else {
+            byteString = ByteString.copyFrom(endpoint.getSerializer().serialize(body));
+        }
+
+        PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder().setData(byteString);
+        Map<String, String> attributes = exchange.getIn().getHeader(ATTRIBUTES, Map.class);
+        if (attributes != null) {
+            for (Map.Entry<String, String> attribute : attributes.entrySet()) {
+                if (!attribute.getKey().startsWith(RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX)) {
+                    messageBuilder.putAttributes(attribute.getKey(), attribute.getValue());
+                }
+            }
+        }
+        String orderingKey = exchange.getIn().getHeader(ORDERING_KEY, String.class);
+        if (orderingKey != null) {
+            messageBuilder.setOrderingKey(orderingKey);
+        }
+
+        PubsubMessage message = messageBuilder.build();
+
+        ApiFuture<String> messageIdFuture = publisher.publish(message);
+        exchange.getIn().setHeader(GooglePubsubLiteConstants.MESSAGE_ID, messageIdFuture.get());

Review Comment:
   Fixed in 8bfe10f



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus commented on PR #13630:
URL: https://github.com/apache/camel/pull/13630#issuecomment-2024724635

   maybe you can build the docs folder and add changed file to the PR so the GH action can run
   
   	docs/components/modules/ROOT/examples/json/google-pubsub-lite.json
   	docs/components/modules/ROOT/pages/google-pubsub-lite-component.adoc
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus commented on PR #13630:
URL: https://github.com/apache/camel/pull/13630#issuecomment-2026937895

   okay pushed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus commented on PR #13630:
URL: https://github.com/apache/camel/pull/13630#issuecomment-2025315868

   > Can you tell more what is the difference between this `list` vs the regular pubsub in google ? And is so that would be good to have in the docs
   
   Ah its in the JIRA. If this information is missing in the docs, then it would be good to add there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus commented on PR #13630:
URL: https://github.com/apache/camel/pull/13630#issuecomment-2026921423

   its a bit of mess to get this PR clean due to the 4.5 -> 4.6 release and main changes. So lets me clean it up, and when done you can if more needed send new PRs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus commented on PR #13630:
URL: https://github.com/apache/camel/pull/13630#issuecomment-2026936857

   Okay I am soon ready and will push fixes.
   
   I noticed that the doc for `consumerBytesOutstanding` in the component is wrong. You are welcome to send a PR to fix that when I am done first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "oscerd (via GitHub)" <gi...@apache.org>.
oscerd commented on code in PR #13630:
URL: https://github.com/apache/camel/pull/13630#discussion_r1542340660


##########
components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteProducer.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsublite;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.api.core.ApiFuture;
+import com.google.cloud.pubsublite.cloudpubsub.Publisher;
+import com.google.common.base.Strings;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.google.pubsublite.GooglePubsubLiteConstants.ATTRIBUTES;
+import static org.apache.camel.component.google.pubsublite.GooglePubsubLiteConstants.ORDERING_KEY;
+import static org.apache.camel.component.google.pubsublite.GooglePubsubLiteConstants.RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX;
+
+/**
+ * Generic PubSub Lite Producer
+ */
+public class GooglePubsubLiteProducer extends DefaultProducer {
+
+    public Logger logger;
+
+    public GooglePubsubLiteProducer(GooglePubsubLiteEndpoint endpoint) {
+        super(endpoint);
+
+        String loggerId = endpoint.getLoggerId();
+
+        if (Strings.isNullOrEmpty(loggerId)) {
+            loggerId = this.getClass().getName();
+        }
+
+        logger = LoggerFactory.getLogger(loggerId);
+    }
+
+    /**
+     * The incoming message is expected to be either - a List of Exchanges (aggregated) - an Exchange
+     */
+    @Override
+    public void process(Exchange exchange) throws Exception {
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("uploader thread/id: {} / {}. api call completed.", Thread.currentThread().getId(),
+                    exchange.getExchangeId());
+        }
+
+        if (exchange.getIn().getBody() instanceof List) {
+            boolean groupedExchanges = false;
+            for (Object body : exchange.getIn().getBody(List.class)) {
+                if (body instanceof Exchange) {
+                    send((Exchange) body);
+                    groupedExchanges = true;
+                }
+            }
+            if (!groupedExchanges) {
+                send(exchange);
+            }
+        } else {
+            send(exchange);
+        }
+    }
+
+    private void send(Exchange exchange) throws Exception {
+
+        GooglePubsubLiteEndpoint endpoint = (GooglePubsubLiteEndpoint) getEndpoint();
+        String topicName = String.format("projects/%s/locations/%s/topics/%s", endpoint.getProjectId(), endpoint.getLocation(),
+                endpoint.getDestinationName());
+
+        Publisher publisher = endpoint.getComponent().getPublisher(topicName, endpoint);
+
+        Object body = exchange.getIn().getBody();
+        ByteString byteString;
+
+        if (body instanceof String) {
+            byteString = ByteString.copyFromUtf8((String) body);
+        } else if (body instanceof byte[]) {
+            byteString = ByteString.copyFrom((byte[]) body);
+        } else {
+            byteString = ByteString.copyFrom(endpoint.getSerializer().serialize(body));
+        }
+
+        PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder().setData(byteString);
+        Map<String, String> attributes = exchange.getIn().getHeader(ATTRIBUTES, Map.class);
+        if (attributes != null) {
+            for (Map.Entry<String, String> attribute : attributes.entrySet()) {
+                if (!attribute.getKey().startsWith(RESERVED_GOOGLE_CLIENT_ATTRIBUTE_PREFIX)) {
+                    messageBuilder.putAttributes(attribute.getKey(), attribute.getValue());
+                }
+            }
+        }
+        String orderingKey = exchange.getIn().getHeader(ORDERING_KEY, String.class);
+        if (orderingKey != null) {
+            messageBuilder.setOrderingKey(orderingKey);
+        }
+
+        PubsubMessage message = messageBuilder.build();
+
+        ApiFuture<String> messageIdFuture = publisher.publish(message);
+        exchange.getIn().setHeader(GooglePubsubLiteConstants.MESSAGE_ID, messageIdFuture.get());

Review Comment:
   Use getMessage instead of getIn



##########
components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteComponent.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsublite;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.cloudpubsub.*;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import org.apache.camel.Endpoint;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the component that manages {@link GooglePubsubLiteEndpoint}.
+ */
+@Component("google-pubsub-lite")
+public class GooglePubsubLiteComponent extends DefaultComponent {
+    private static final Logger LOG = LoggerFactory.getLogger(GooglePubsubLiteComponent.class);
+
+    @Metadata(label = "common",
+              description = "The Service account key that can be used as credentials for the PubSub Lite publisher/subscriber. It can be loaded by default from "
+                            + " classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems.")
+    private String serviceAccountKey;
+
+    @Metadata(
+              label = "producer",
+              description = "Maximum number of producers to cache. This could be increased if you have producers for lots of different topics.")
+    private int publisherCacheSize = 100;
+
+    @Metadata(
+              label = "producer",
+              description = "How many milliseconds should each producer stay alive in the cache.")
+    private int publisherCacheTimeout = 180000;
+
+    @Metadata(
+              label = "consumer",
+              description = "How many milliseconds should each producer stay alive in the cache. " +
+                            "Must be greater than the allowed size of the largest message (1 MiB).")
+    private long consumerBytesOutstanding = 10 * 1024 * 1024;
+
+    @Metadata(
+              label = "consumer",
+              description = "The number of messages that may be outstanding to the client. Must be >0.")
+    private long consumerMessagesOutstanding = 1000;
+
+    @Metadata(
+              label = "advanced",
+              description = "How many milliseconds should a producer be allowed to terminate.")
+    private int publisherTerminationTimeout = 60000;
+
+    private RemovalListener<String, Publisher> removalListener = removal -> {
+        Publisher publisher = removal.getValue();
+        if (publisher == null) {

Review Comment:
   To check for null you can use the ObjectHelper from camel.



##########
components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteComponent.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsublite;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.cloudpubsub.*;

Review Comment:
   Please don't use star imports.



##########
components/camel-google/camel-google-pubsub-lite/.gitignore:
##########
@@ -0,0 +1,7 @@
+.idea

Review Comment:
   This file could be removed.



##########
components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteEndpoint.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsublite;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.*;
+import org.apache.camel.component.google.pubsublite.serializer.DefaultGooglePubsubSerializer;
+import org.apache.camel.component.google.pubsublite.serializer.GooglePubsubSerializer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Send and receive messages to/from Google Cloud Platform PubSub Lite Service.
+ * <p/>
+ * Built on top of the Google Cloud Pub/Sub Lite libraries.
+ */
+@UriEndpoint(firstVersion = "4.5.0", scheme = "google-pubsub-lite", title = "Google Pubsub Lite",
+             syntax = "google-pubsub-lite:projectId:location:destinationName",
+             category = { Category.CLOUD, Category.MESSAGING },
+             headersClass = GooglePubsubLiteConstants.class)
+public class GooglePubsubLiteEndpoint extends DefaultEndpoint {
+
+    private Logger log;
+
+    @UriPath(label = "common", description = "The Google Cloud PubSub Lite Project Id")
+    @Metadata(required = true)
+    private Long projectId;
+
+    @UriPath(label = "common", description = "The Google Cloud PubSub Lite location")
+    @Metadata(required = true)
+    private String location;
+
+    @UriPath(label = "common",
+             description = "The Destination Name. For the consumer this will be the subscription name, while for the producer this will be the topic name.")
+    @Metadata(required = true)
+    private String destinationName;
+
+    @UriParam(label = "common",
+              description = "The Service account key that can be used as credentials for the PubSub publisher/subscriber. It can be loaded by default from "
+                            + " classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems.")
+    private String serviceAccountKey;
+
+    @UriParam(name = "loggerId", description = "Logger ID to use when a match to the parent route required")
+    private String loggerId;
+
+    @UriParam(label = "consumer", name = "concurrentConsumers",
+              description = "The number of parallel streams consuming from the subscription",
+              defaultValue = "1")
+    private Integer concurrentConsumers = 1;
+
+    @UriParam(label = "consumer", name = "maxMessagesPerPoll",
+              description = "The max number of messages to receive from the server in a single API call", defaultValue = "1")
+    private Integer maxMessagesPerPoll = 1;
+
+    @UriParam(label = "consumer", defaultValue = "AUTO", enums = "AUTO,NONE",
+              description = "AUTO = exchange gets ack'ed/nack'ed on completion. NONE = downstream process has to ack/nack explicitly")
+    private GooglePubsubLiteConstants.AckMode ackMode = GooglePubsubLiteConstants.AckMode.AUTO;
+
+    @UriParam(label = "consumer", name = "maxAckExtensionPeriod",
+              description = "Set the maximum period a message ack deadline will be extended. Value in seconds",
+              defaultValue = "3600")
+    private int maxAckExtensionPeriod = 3600;
+
+    @UriParam(description = "Pub/Sub endpoint to use. Required when using message ordering, and ensures that messages are received in order even when multiple publishers are used",
+              label = "producer,advanced")
+    private String pubsubEndpoint;
+
+    @UriParam(name = "serializer",
+              description = "A custom GooglePubsubLiteSerializer to use for serializing message payloads in the producer",
+              label = "producer,advanced")
+    @Metadata(autowired = true)
+    private GooglePubsubSerializer serializer;
+
+    public GooglePubsubLiteEndpoint(String uri, Component component) {
+        super(uri, component);
+
+        if (!(component instanceof GooglePubsubLiteComponent)) {

Review Comment:
   I think this is a bit overkilling, we could avoid this check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "stn1slv (via GitHub)" <gi...@apache.org>.
stn1slv commented on code in PR #13630:
URL: https://github.com/apache/camel/pull/13630#discussion_r1542457092


##########
components/camel-google/camel-google-pubsub-lite/src/main/java/org/apache/camel/component/google/pubsublite/GooglePubsubLiteEndpoint.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsublite;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.*;
+import org.apache.camel.component.google.pubsublite.serializer.DefaultGooglePubsubSerializer;
+import org.apache.camel.component.google.pubsublite.serializer.GooglePubsubSerializer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Send and receive messages to/from Google Cloud Platform PubSub Lite Service.
+ * <p/>
+ * Built on top of the Google Cloud Pub/Sub Lite libraries.
+ */
+@UriEndpoint(firstVersion = "4.5.0", scheme = "google-pubsub-lite", title = "Google Pubsub Lite",
+             syntax = "google-pubsub-lite:projectId:location:destinationName",
+             category = { Category.CLOUD, Category.MESSAGING },
+             headersClass = GooglePubsubLiteConstants.class)
+public class GooglePubsubLiteEndpoint extends DefaultEndpoint {
+
+    private Logger log;
+
+    @UriPath(label = "common", description = "The Google Cloud PubSub Lite Project Id")
+    @Metadata(required = true)
+    private Long projectId;
+
+    @UriPath(label = "common", description = "The Google Cloud PubSub Lite location")
+    @Metadata(required = true)
+    private String location;
+
+    @UriPath(label = "common",
+             description = "The Destination Name. For the consumer this will be the subscription name, while for the producer this will be the topic name.")
+    @Metadata(required = true)
+    private String destinationName;
+
+    @UriParam(label = "common",
+              description = "The Service account key that can be used as credentials for the PubSub publisher/subscriber. It can be loaded by default from "
+                            + " classpath, but you can prefix with classpath:, file:, or http: to load the resource from different systems.")
+    private String serviceAccountKey;
+
+    @UriParam(name = "loggerId", description = "Logger ID to use when a match to the parent route required")
+    private String loggerId;
+
+    @UriParam(label = "consumer", name = "concurrentConsumers",
+              description = "The number of parallel streams consuming from the subscription",
+              defaultValue = "1")
+    private Integer concurrentConsumers = 1;
+
+    @UriParam(label = "consumer", name = "maxMessagesPerPoll",
+              description = "The max number of messages to receive from the server in a single API call", defaultValue = "1")
+    private Integer maxMessagesPerPoll = 1;
+
+    @UriParam(label = "consumer", defaultValue = "AUTO", enums = "AUTO,NONE",
+              description = "AUTO = exchange gets ack'ed/nack'ed on completion. NONE = downstream process has to ack/nack explicitly")
+    private GooglePubsubLiteConstants.AckMode ackMode = GooglePubsubLiteConstants.AckMode.AUTO;
+
+    @UriParam(label = "consumer", name = "maxAckExtensionPeriod",
+              description = "Set the maximum period a message ack deadline will be extended. Value in seconds",
+              defaultValue = "3600")
+    private int maxAckExtensionPeriod = 3600;
+
+    @UriParam(description = "Pub/Sub endpoint to use. Required when using message ordering, and ensures that messages are received in order even when multiple publishers are used",
+              label = "producer,advanced")
+    private String pubsubEndpoint;
+
+    @UriParam(name = "serializer",
+              description = "A custom GooglePubsubLiteSerializer to use for serializing message payloads in the producer",
+              label = "producer,advanced")
+    @Metadata(autowired = true)
+    private GooglePubsubSerializer serializer;
+
+    public GooglePubsubLiteEndpoint(String uri, Component component) {
+        super(uri, component);
+
+        if (!(component instanceof GooglePubsubLiteComponent)) {

Review Comment:
   Fixed in [8bfe10f](https://github.com/apache/camel/pull/13630/commits/8bfe10f92a9fd68d9d9fa417c5e906aca3b20d7a)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus commented on PR #13630:
URL: https://github.com/apache/camel/pull/13630#issuecomment-2024725081

   cd camel/docs
   mvn install
   
   and then you should have some changed files you can add to this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-20625: Create a camel-google-pubsub-lite component [camel]

Posted by "stn1slv (via GitHub)" <gi...@apache.org>.
stn1slv commented on PR #13630:
URL: https://github.com/apache/camel/pull/13630#issuecomment-2026946070

   Thanks, I'll fix the doc for `consumerBytesOutstanding`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org