You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "Amraneze (via GitHub)" <gi...@apache.org> on 2023/03/23 14:44:32 UTC

[GitHub] [beam] Amraneze commented on pull request #25945: [#25887] fix(JmsIO): issue with multiple connection open #25887

Amraneze commented on PR #25945:
URL: https://github.com/apache/beam/pull/25945#issuecomment-1481324978

   @Abacn I pushed this PR to fix the issue and more integration test. We are working on having a pool connection to determine the connection that we need. Something like this.
   
   ```java
    private static class JmsConnectionPool<T> implements Serializable {
   
     private static final long serialVersionUID = 1L;
     private static final int DEFAULT_MAX_POOL_SIZE = 10;
     private static final int DEFAULT_INITIAL_POOL_SIZE = 20;
   
     private JmsIO.Write<T> spec;
     private final int maxPoolSize;
     private final int initialPoolSize;
     private List<JmsConnection<T>> jmsConnectionPool;
     private List<JmsConnection<T>> usedJmsConnections = new ArrayList<>();
   
     JmsConnectionPool(JmsIO.Write<T> spec, List<JmsConnection<T>> jmsConnectionPool) {
       this.spec = spec;
       this.jmsConnectionPool = jmsConnectionPool;
       this.maxPoolSize = Optional.ofNullable(spec.getMaxPoolSize()).orElse(DEFAULT_MAX_POOL_SIZE);
       this.initialPoolSize = Optional.ofNullable(spec.getInitialPoolSize()).orElse(DEFAULT_INITIAL_POOL_SIZE);
     }
   
     static <T> JmsConnectionPool<T> create(JmsIO.Write<T> spec) {
       int initialPoolSize = Optional.ofNullable(spec.getInitialPoolSize()).orElse(DEFAULT_INITIAL_POOL_SIZE);
       List<JmsConnection<T>> jmsConnectionPool = new ArrayList<>(initialPoolSize);
       for (int i = 0; i < initialPoolSize; i++) {
         jmsConnectionPool.add(new JmsConnection<>(spec));
       }
       return new JmsConnectionPool<>(spec, jmsConnectionPool);
     }
   
     JmsConnection<T> getConnection() throws JmsIOException {
       if (jmsConnectionPool.isEmpty()) {
         if (usedJmsConnections.size() < maxPoolSize) {
           jmsConnectionPool.add(new JmsConnection<>(spec));
         } else {
           throw new JmsIOException("Maximum pool connection size has been reached");
         }
       }
   
       JmsConnection<T> jmsConnection = jmsConnectionPool
               .remove(jmsConnectionPool.size() - 1);
   
       usedJmsConnections.add(jmsConnection);
       return jmsConnection;
     }
   
     public boolean releaseConnection(JmsConnection<T> jmsConnection) {
       jmsConnectionPool.add(jmsConnection);
       return usedJmsConnections.remove(jmsConnection);
     }
   
     public boolean closeConnection(JmsConnection<T> jmsConnection) {
       jmsConnection.close();
       jmsConnectionPool.remove(jmsConnection);
       return usedJmsConnections.remove(jmsConnection);
     }
   
     public void shutdown() throws JMSException {
       usedJmsConnections.forEach(this::releaseConnection);
       for (JmsConnection<T> jmsConnection : jmsConnectionPool) {
         jmsConnection.close();
       }
       jmsConnectionPool.clear();
     }
   }
   ```
   
   The function `closeConnection` will be called inside of `this.connection.setExceptionListener`. What do you think about it ? We want also to create latency or issues with other projects using JmsIO


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org