You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "Amraneze (via GitHub)" <gi...@apache.org> on 2023/03/23 14:33:23 UTC

[GitHub] [beam] Amraneze opened a new pull request, #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Amraneze opened a new pull request, #25945:
URL: https://github.com/apache/beam/pull/25945

   - Issue related to multiple connection being open for each bundle
   - Add integration test using jms-qpid for JmsIO
   
   Fixes #25887
   
   Related to PR#25886
   
   There are two points handled in this PR:
   - The issue that we encountered that the pipeline open *6k* ports for each VM. With *18* workers, we had *108k* ports opened. After checking the issue, we found out that creating a connection in startBundle is not a good approach. Especially that the client that we use [qpid-jms-client](https://github.com/apache/qpid-jms/tree/main/qpid-jms-client) open a connection per CPU. Having 6 CPUs, the JMS client will open 6 connections. The solution that we tested, was to create a connection only in setup cycle and recreate it, if it failed, during the process element cycle.
   - We added integration tests for the client [qpid-jms-client](https://github.com/apache/qpid-jms/tree/main/qpid-jms-client). We want to have multiple tests for each Jms client.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [x] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn merged pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn merged PR #25945:
URL: https://github.com/apache/beam/pull/25945


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1496430695

   Generally, the backOff retry intermittent error (that a retry in place can success).
   
   If your concern is the persistent error (a dead connection) it should be handled by runner. That is, when the exception is popping up, the runner will create a new work item, which includes initializing a new DoFn instance, to retry the bundle. 
   
   Handling all errors within DoFn, theoretically, it works, but not encouraged because it adds complexity to the implementation.
   
   If republish need more test, can we aim to fix the original issue (high port occupacy) that was a regression in 2.46.0 at the moment?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1481354188

   ## [Codecov](https://codecov.io/gh/apache/beam/pull/25945?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#25945](https://codecov.io/gh/apache/beam/pull/25945?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7136123) into [master](https://codecov.io/gh/apache/beam/commit/99202b237e364bf77f40b6da0ec22cb7b17c37d0?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (99202b2) will **not change** coverage.
   > The diff coverage is `n/a`.
   
   ```diff
   @@           Coverage Diff           @@
   ##           master   #25945   +/-   ##
   =======================================
     Coverage   71.41%   71.41%           
   =======================================
     Files         778      778           
     Lines      102430   102430           
   =======================================
     Hits        73152    73152           
     Misses      27823    27823           
     Partials     1455     1455           
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `79.95% <ø> (ø)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   [see 10 files with indirect coverage changes](https://codecov.io/gh/apache/beam/pull/25945/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1481576121

   > And about the creating of connection, should we keep it at setup or startBundle ?
   
   At setup and destroy in tearDown, as the original implementation. Then only one port will be used for one worker thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1496440139

   For the comment https://github.com/apache/beam/pull/25945#issuecomment-1496417672
   
   > 1 - inside of the exception listener
   
   is there a potential racing condition to operate connection inside connection.exceptionListener? 
   
   > 2 - inside of the loop
   
   This looks good at first glance. Though I still suggest to improvement one thing at a time, to get at lease a stable version first.
   
   Also, realized that close and connect need to be marked as synchronized


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amraneze commented on a diff in pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Amraneze (via GitHub)" <gi...@apache.org>.
Amraneze commented on code in PR #25945:
URL: https://github.com/apache/beam/pull/25945#discussion_r1157566021


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -1026,7 +1027,7 @@ public void start() throws JMSException {
         }

Review Comment:
   We added the flag in case of a failed connection it will create a new connection based on exceptionListener. Do you think it would be better to check with the producer if it's null ?
   
   ````java
   if (producer == null) {
   // open connection
   connection.setExceptionListener(exception -> {
       connectionCounter.inc();
       // if there is an issue with the connection, we will close session, connection & producer so it can be recreated it
       close();
   });
   // create new producer
   }
   ````



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amraneze commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Amraneze (via GitHub)" <gi...@apache.org>.
Amraneze commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1481390556

   @Abacn I had an issue with source compilation. The client that I used [qpid-jms-client](https://github.com/apache/qpid-jms/tree/main/qpid-jms-client) is compiled using *JDK 11* but the tests are running using *JDK 8*. I downgraded the version to [0.61.0](https://mvnrepository.com/artifact/org.apache.qpid/qpid-jms-client/0.61.0) compiled using *JDK 8* in the commit [73bb7f8](https://github.com/apache/qpid-jms/commit/73bb7f8cecfa055f2a1c9c903233c12bc168c33c). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25945:
URL: https://github.com/apache/beam/pull/25945#discussion_r1158645353


##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+import static org.apache.beam.sdk.io.jms.CommonJms.PASSWORD;
+import static org.apache.beam.sdk.io.jms.CommonJms.QUEUE;
+import static org.apache.beam.sdk.io.jms.CommonJms.USERNAME;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.Timestamp;

Review Comment:
   iiuc there is no need of a google cloud dependency here, can switch to use joda or java time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25945:
URL: https://github.com/apache/beam/pull/25945#discussion_r1157517762


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -1026,7 +1027,7 @@ public void start() throws JMSException {
         }

Review Comment:
   may replace the direct assignment of producer with startProducer() thus make it clear that producer is opened in single code path.
   
   Also, I see " isProducerNeedsToBeCreated" is removed in several places and connect() is only called in DoFn's setup. Can we get rid of this flag now?



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -1043,25 +1044,35 @@ public void publishMessage(T input) throws JMSException, JmsIOException {
         }
       }
 
-      public void close() throws JMSException {
-        isProducerNeedsToBeCreated = true;
+      void startProducer() throws JMSException {
+        this.producer = this.session.createProducer(null);
+      }
+
+      void closeProducer() throws JMSException {
         if (producer != null) {
           producer.close();
           producer = null;
         }
-        if (session != null) {
-          session.close();
-          session = null;
-        }
-        if (connection != null) {
-          try {
-            // If the connection failed, stopping the connection will throw a JMSException
-            connection.stop();
-          } catch (JMSException exception) {
-            LOG.warn("The connection couldn't be closed", exception);
+      }
+
+      void close() {
+        try {
+          if (producer != null) {

Review Comment:
   may call closeProducer() here. Making producer is opened / closed by single code path



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1481403579

   Also, integration test goes to JmsIOIT and has its own gradle task. The guideline of Beam website may help with design and implement integration tests: https://beam.apache.org/documentation/io/io-standards/#integration-tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amraneze commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Amraneze (via GitHub)" <gi...@apache.org>.
Amraneze commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1481414954

   > The forward fix is heavy. To avoid introducing new risk is it possible to have a lightweight fix at first? For example, Does only close the producer and session at finishBundle (but keep session and connection open) mitigate the pressure to the server? Per [documentation](https://docs.oracle.com/cd/E19798-01/821-1841/bncem/index.html) "A connection could represent an open TCP/IP socket between a client and a provider service daemon." So the port use should be per connection base.
   And about the creating of connection, should we keep it at setup or startBundle ?
   
   > Connection Pool is a feature request and can be considered later.
   Yes, it would be nice to have.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1491837467

   Reminder, please take a look at this pr: @kennknowles @damccorm @ahmedabu98 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25945:
URL: https://github.com/apache/beam/pull/25945#discussion_r1157571130


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -1026,7 +1027,7 @@ public void start() throws JMSException {
         }

Review Comment:
   Both is fine, if the flag is used then it should be consistent. For now if closeProducer() is called, producer is closed and set to null, but this flag is still true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25945:
URL: https://github.com/apache/beam/pull/25945#discussion_r1158648871


##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+import static org.apache.beam.sdk.io.jms.CommonJms.PASSWORD;
+import static org.apache.beam.sdk.io.jms.CommonJms.QUEUE;
+import static org.apache.beam.sdk.io.jms.CommonJms.USERNAME;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.Timestamp;

Review Comment:
   let me see if I can fix it myself. Then will merge



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25945:
URL: https://github.com/apache/beam/pull/25945#discussion_r1157571130


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -1026,7 +1027,7 @@ public void start() throws JMSException {
         }

Review Comment:
   Both is fine, if the flag is used then it should be consistent. For now if closeProducer() is called, producer is closed and set to null, but isProducerNeedsToBeCreated is still false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amraneze commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Amraneze (via GitHub)" <gi...@apache.org>.
Amraneze commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1496417672

   > > > Thanks, just have a few comments about clean up. I think we can still get it in by the 2.47.0 cut which is tomorrow. Have you tested in your environement that this PR resolves the port occupying issue?
   > > 
   > > When the version 2.47.0 will be published? So I can know if I will have time to test in my environment. For the issue with new connection it was fixed.
   > 
   > Release cut is tomorrow, Apr 5th. Release date targetted to early May. 
   
   I'm not really confident about it. I didn't test the republish functionality which is a major thing for us. The problem that I can see is that in case of a failed connection, the DoFn thread will retry the bundle for X times during a duration Y. But, there is no recreating the connection again. I can suggest two possible ways to do it:
   
   1 - inside of the exception listener 
   
   ````java
   connection.setExceptionListener(exception -> {
     failedConnectionCounter.inc();
     // Make sure to free the failed connection 
     close();
     // Recreate a new connection with a new producer
     connect();
   });
   ````
   
   2 - inside of the loop 
   
   ````java
   connection.setExceptionListener(exception -> {
     failedConnectionCounter.inc();
     // Make sure to free the failed connection 
     close();
   });
   ...
   void connect () {
      if (producer == null) {
      ....
      }
   }
   ...
   
   void publishMessage() {
    while(true) {
     //Recreate a connection if the producer is null
     connect();
     ...
    }
   }
   ````
   
   What do you think about this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25945:
URL: https://github.com/apache/beam/pull/25945#discussion_r1156196098


##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/CommonJms.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.security.AuthenticationUser;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.amqp.AmqpTransportFactory;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.runners.Parameterized;
+
+public class CommonJms implements Serializable {

Review Comment:
   though this is a test fixture, some javadoc comment is appreciate.



##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -92,70 +87,49 @@
 import org.apache.beam.sdk.transforms.SerializableBiFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.apache.qpid.jms.JmsAcknowledgeCallback;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.message.JmsTextMessage;
 import org.joda.time.Duration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Tests of {@link JmsIO}. */
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
 @SuppressWarnings({
   "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
 })
-public class JmsIOTest {
-
-  private static final String BROKER_URL = "vm://localhost";
-
-  private static final String USERNAME = "test_user";
-  private static final String PASSWORD = "test_password";
-  private static final String QUEUE = "test_queue";
-  private static final String TOPIC = "test_topic";
-
-  private BrokerService broker;
-  private ConnectionFactory connectionFactory;
-  private ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch;
-
-  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+public class JmsIOTest extends CommonJms {

Review Comment:
   Can we make JmsIOTest and JmsIOIT own a CommonJms instance to handle connection related things instead of extend it? There is no polymorphism usage involved here and inheritance can make the test class implementation complicated imo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amraneze commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Amraneze (via GitHub)" <gi...@apache.org>.
Amraneze commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1481324978

   @Abacn I pushed this PR to fix the issue and more integration test. We are working on having a pool connection to determine the connection that we need. Something like this.
   
   ```java
    private static class JmsConnectionPool<T> implements Serializable {
   
     private static final long serialVersionUID = 1L;
     private static final int DEFAULT_MAX_POOL_SIZE = 10;
     private static final int DEFAULT_INITIAL_POOL_SIZE = 20;
   
     private JmsIO.Write<T> spec;
     private final int maxPoolSize;
     private final int initialPoolSize;
     private List<JmsConnection<T>> jmsConnectionPool;
     private List<JmsConnection<T>> usedJmsConnections = new ArrayList<>();
   
     JmsConnectionPool(JmsIO.Write<T> spec, List<JmsConnection<T>> jmsConnectionPool) {
       this.spec = spec;
       this.jmsConnectionPool = jmsConnectionPool;
       this.maxPoolSize = Optional.ofNullable(spec.getMaxPoolSize()).orElse(DEFAULT_MAX_POOL_SIZE);
       this.initialPoolSize = Optional.ofNullable(spec.getInitialPoolSize()).orElse(DEFAULT_INITIAL_POOL_SIZE);
     }
   
     static <T> JmsConnectionPool<T> create(JmsIO.Write<T> spec) {
       int initialPoolSize = Optional.ofNullable(spec.getInitialPoolSize()).orElse(DEFAULT_INITIAL_POOL_SIZE);
       List<JmsConnection<T>> jmsConnectionPool = new ArrayList<>(initialPoolSize);
       for (int i = 0; i < initialPoolSize; i++) {
         jmsConnectionPool.add(new JmsConnection<>(spec));
       }
       return new JmsConnectionPool<>(spec, jmsConnectionPool);
     }
   
     JmsConnection<T> getConnection() throws JmsIOException {
       if (jmsConnectionPool.isEmpty()) {
         if (usedJmsConnections.size() < maxPoolSize) {
           jmsConnectionPool.add(new JmsConnection<>(spec));
         } else {
           throw new JmsIOException("Maximum pool connection size has been reached");
         }
       }
   
       JmsConnection<T> jmsConnection = jmsConnectionPool
               .remove(jmsConnectionPool.size() - 1);
   
       usedJmsConnections.add(jmsConnection);
       return jmsConnection;
     }
   
     public boolean releaseConnection(JmsConnection<T> jmsConnection) {
       jmsConnectionPool.add(jmsConnection);
       return usedJmsConnections.remove(jmsConnection);
     }
   
     public boolean closeConnection(JmsConnection<T> jmsConnection) {
       jmsConnection.close();
       jmsConnectionPool.remove(jmsConnection);
       return usedJmsConnections.remove(jmsConnection);
     }
   
     public void shutdown() throws JMSException {
       usedJmsConnections.forEach(this::releaseConnection);
       for (JmsConnection<T> jmsConnection : jmsConnectionPool) {
         jmsConnection.close();
       }
       jmsConnectionPool.clear();
     }
   }
   ```
   
   The function `closeConnection` will be called inside of `this.connection.setExceptionListener`. What do you think about it ? We want also to create latency or issues with other projects using JmsIO


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amraneze commented on a diff in pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Amraneze (via GitHub)" <gi...@apache.org>.
Amraneze commented on code in PR #25945:
URL: https://github.com/apache/beam/pull/25945#discussion_r1158650034


##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+import static org.apache.beam.sdk.io.jms.CommonJms.PASSWORD;
+import static org.apache.beam.sdk.io.jms.CommonJms.QUEUE;
+import static org.apache.beam.sdk.io.jms.CommonJms.USERNAME;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.Timestamp;

Review Comment:
   I did fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #25945:
URL: https://github.com/apache/beam/pull/25945#discussion_r1158650892


##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jms;
+
+import static org.apache.beam.sdk.io.jms.CommonJms.PASSWORD;
+import static org.apache.beam.sdk.io.jms.CommonJms.QUEUE;
+import static org.apache.beam.sdk.io.jms.CommonJms.USERNAME;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.Timestamp;

Review Comment:
   thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1496351832

   > > Thanks, just have a few comments about clean up. I think we can still get it in by the 2.47.0 cut which is tomorrow. Have you tested in your environement that this PR resolves the port occupying issue?
   > 
   > When the version 2.47.0 will be published? So I can know if I will have time to test in my environment. For the issue with new connection it was fixed.
   
   Release cut is tomorrow, Apr 5th. Release date targetted to early May. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1497830886

   test passed though not reflected on GitHub UI. Merging for now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1481392783

   The forward fix is heavy. To avoid introducing new risk is it possible to have a lightweight fix at the first? For example, Does only close the producer and session at finishBundle (but keep session and connection open) mitigate the pressure to the server? Per [documentation](https://docs.oracle.com/cd/E19798-01/821-1841/bncem/index.html) "A connection could represent an open TCP/IP socket between a client and a provider service daemon." So the port use should be per connection base.
   
   Connection Pool is a feature request and can be considered later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1481465578

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   R: @damccorm for label build.
   R: @ahmedabu98 for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amraneze commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Amraneze (via GitHub)" <gi...@apache.org>.
Amraneze commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1496341832

   > Thanks, just have a few comments about clean up. I think we can still get it in by the 2.47.0 cut which is tomorrow. Have you tested in your environement that this PR resolves the port occupying issue?
   
   When the version 2.47.0 will be published? So I can know if I will have time to test in my environment 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Amraneze commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Posted by "Amraneze (via GitHub)" <gi...@apache.org>.
Amraneze commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1497042746

   > For the comment [#25945 (comment)](https://github.com/apache/beam/pull/25945#issuecomment-1496417672)
   > 
   > > 1 - inside of the exception listener
   > 
   > is there a potential racing condition to operate connection inside connection.exceptionListener?
   > 
   > > 2 - inside of the loop
   > 
   > This looks good at first glance. Though I still suggest to improvement one thing at a time, to get at lease a stable version first.
   > 
   > Also, if we call close and connect inside `@ProcessElement` (other than `@setup` and `@teardown`), they need to be marked as synchronized methods
   
   I will try to see which solution have better performance. Should we add it with PoolConnection ? 
   
   Just FYI, when there is an exception while publishing, the bundle is not republished because the message is sent to an output. That was the previous implementation and I wanted to keep backward compatibility. So if we remove that, we can just throw an exception and let Dataflow to retry the bundle without the retry policy.
   
   Here is a graph that explains the logic of JmsIO.
   ![Brainstorming](https://user-images.githubusercontent.com/28459763/230010489-f4943266-a022-4816-bca6-0544fafe23f0.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org