You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "engimatic (via GitHub)" <gi...@apache.org> on 2023/06/13 06:16:02 UTC

[GitHub] [arrow] engimatic opened a new issue, #36043: Force an outage of the client,server stuck in an endless loop

engimatic opened a new issue, #36043:
URL: https://github.com/apache/arrow/issues/36043

   ### Describe the usage question you have. Please include as many useful details as  possible.
   
   
   `try (BufferAllocator allocator = new RootAllocator()) {
               // Server
               try (final CookbookProducer producer = new CookbookProducer(allocator, location);
                    final FlightServer flightServer = FlightServer.builder(allocator, location, producer).build()) {
                   try {
                       flightServer.start();
                       System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort());
                   } catch (IOException e) {
                       throw new RuntimeException(e);
                   }
               }
           }`
   
   And I handle mysql resultset like this:
   `ResultSet resultSet = statement.executeQuery();
   ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator(
                            resultSet, config)`
   
   `public void handleArrowIterator(ArrowVectorIterator iterator, BufferAllocator allocator) {
           int index = 0;
   
           while (iterator.hasNext() && !listener.isCancelled()) {
               if (listener.isReady()) {
                   try (VectorSchemaRoot root = iterator.next()) {
                       index++;
                       doSomeThing();
                   }
                   
               }
              
           }
           listener.completed();
         
       }`
   
   
   But when I force an outage of the client,listener.isCancelled() and listener.isReady() always false,The server is stuck in an endless loop.How to reslove it?
   
   ### Component(s)
   
   FlightRPC


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] davisusanibar commented on issue #36043: [Java] Force an outage of the client,server stuck in an endless loop

Posted by "davisusanibar (via GitHub)" <gi...@apache.org>.
davisusanibar commented on issue #36043:
URL: https://github.com/apache/arrow/issues/36043#issuecomment-1601100758

   Hi @engimatic, I can run both your client and server code.
   
   Could you share your steps for testing the endless loop with me? I cannot reproduce the endless loop.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] engimatic commented on issue #36043: [Java] Force an outage of the client,server stuck in an endless loop

Posted by "engimatic (via GitHub)" <gi...@apache.org>.
engimatic commented on issue #36043:
URL: https://github.com/apache/arrow/issues/36043#issuecomment-1606372285

   @davisusanibar 
   
   When your client run in a little while,forced kill it. Then you will find that the server stuck in an endless loop,at code:
   ```
   while (iterator.hasNext() && !listener.isCancelled()) {
                                   if (listener.isReady()) {
   ```
   And the CPU of server is consistently running at high levels,the next request never returns.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] davisusanibar commented on issue #36043: [Java] Force an outage of the client,server stuck in an endless loop

Posted by "davisusanibar (via GitHub)" <gi...@apache.org>.
davisusanibar commented on issue #36043:
URL: https://github.com/apache/arrow/issues/36043#issuecomment-1592376872

   Hi @engimatic , I would appreciate it if you could clarify which parts are java code on the client side and which parts are part of the java server so that I could reproduce it in the best way possible. Thank you in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on issue #36043: Force an outage of the client,server stuck in an endless loop

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on issue #36043:
URL: https://github.com/apache/arrow/issues/36043#issuecomment-1589151890

   @davisusanibar 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] engimatic commented on issue #36043: Force an outage of the client,server stuck in an endless loop

Posted by "engimatic (via GitHub)" <gi...@apache.org>.
engimatic commented on issue #36043:
URL: https://github.com/apache/arrow/issues/36043#issuecomment-1588620403

   And I setOnCancelHandler like this, but it didn't take effect.
   `
   listener.setOnCancelHandler(() -> {
                               listener.completed();
                               log.error("---getStream cancel");
                           });
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] davisusanibar commented on issue #36043: [Java] Force an outage of the client,server stuck in an endless loop

Posted by "davisusanibar (via GitHub)" <gi...@apache.org>.
davisusanibar commented on issue #36043:
URL: https://github.com/apache/arrow/issues/36043#issuecomment-1631612300

   There is a need to review how Java Flight Server handles window size for: 
   - ACK
   - ZeroWindow
   - WindowUpdate
   
   <img width="1792" alt="image" src="https://github.com/apache/arrow/assets/4554485/2719a1c0-eec9-4e79-9ade-027596f00755">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] engimatic commented on issue #36043: [Java] Force an outage of the client,server stuck in an endless loop

Posted by "engimatic (via GitHub)" <gi...@apache.org>.
engimatic commented on issue #36043:
URL: https://github.com/apache/arrow/issues/36043#issuecomment-1619348900

   @davisusanibar Hello, is there any question about that?Or this is a bug with arrow flight.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] davisusanibar commented on issue #36043: [Java] Force an outage of the client,server stuck in an endless loop

Posted by "davisusanibar (via GitHub)" <gi...@apache.org>.
davisusanibar commented on issue #36043:
URL: https://github.com/apache/arrow/issues/36043#issuecomment-1631597671

   In my trace log request, I'm seeing that the server responds with TCP Zero Window Segment, which is equal to `Don't send me any more data, as I cannot handle them anyway`. There may be a need to tune the ready/cancel parameter on this type of scenario in order to avoid endless loops. I'll investigate this further.
   
   <img width="1779" alt="image" src="https://github.com/apache/arrow/assets/4554485/2195e1bc-eeb6-41ac-91d8-a9d2af631af8">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] engimatic commented on issue #36043: [Java] Force an outage of the client,server stuck in an endless loop

Posted by "engimatic (via GitHub)" <gi...@apache.org>.
engimatic commented on issue #36043:
URL: https://github.com/apache/arrow/issues/36043#issuecomment-1633407865

   @davisusanibar Maybe I should wait for the bug to be fixed.Is there any fast solutions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] davisusanibar commented on issue #36043: [Java] Force an outage of the client,server stuck in an endless loop

Posted by "davisusanibar (via GitHub)" <gi...@apache.org>.
davisusanibar commented on issue #36043:
URL: https://github.com/apache/arrow/issues/36043#issuecomment-1631501736

   Hi @engimatic on my last attempt I caught it.
   
   1. Run Java Flight Server: `Flight Server`
   2. Run Python Client: `python clientuser.py`
   3. Run another Python Client: `python clientuser.py`
   4. Kill Python client: `pkill -9 -f clientuser.py`
   5. Endless loop appear
   
   Let me try to debug to understand the error with more detail.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] engimatic commented on issue #36043: [Java] Force an outage of the client,server stuck in an endless loop

Posted by "engimatic (via GitHub)" <gi...@apache.org>.
engimatic commented on issue #36043:
URL: https://github.com/apache/arrow/issues/36043#issuecomment-1593981795

   > Hi @engimatic , I would appreciate it if you could clarify which parts are java code on the client side and which parts are part of the java server so that I could reproduce it in the best way possible. Thank you in advance.
   
   Server code:
   ```
   import cn.hutool.core.thread.ThreadFactoryBuilder;
   import io.netty.util.internal.PlatformDependent;
   import lombok.extern.slf4j.Slf4j;
   import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
   import org.apache.arrow.adapter.jdbc.JdbcToArrow;
   import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
   import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
   import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
   import org.apache.arrow.flight.FlightDescriptor;
   import org.apache.arrow.flight.FlightServer;
   import org.apache.arrow.flight.Location;
   import org.apache.arrow.flight.NoOpFlightProducer;
   import org.apache.arrow.flight.Ticket;
   import org.apache.arrow.memory.BufferAllocator;
   import org.apache.arrow.memory.RootAllocator;
   import org.apache.arrow.vector.VectorLoader;
   import org.apache.arrow.vector.VectorSchemaRoot;
   import org.apache.arrow.vector.VectorUnloader;
   import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
   
   import java.io.IOException;
   import java.nio.charset.StandardCharsets;
   import java.sql.Connection;
   import java.sql.Driver;
   import java.sql.PreparedStatement;
   import java.sql.ResultSet;
   import java.sql.SQLException;
   import java.util.Properties;
   import java.util.concurrent.ArrayBlockingQueue;
   import java.util.concurrent.ExecutorService;
   import java.util.concurrent.ThreadPoolExecutor;
   import java.util.concurrent.TimeUnit;
   
   
   @Slf4j
   public class Server {
       /**
        * main.
        *
        * @param args
        * @throws InterruptedException
        */
       public static void main(String[] args) throws InterruptedException, IOException {
           int corePoolSize = Math.max(Runtime.getRuntime().availableProcessors(), 2) * 2;
           int maxPoolSize = corePoolSize * 2;
           ExecutorService executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 300,
                   TimeUnit.SECONDS, new ArrayBlockingQueue<>(100),
                   ThreadFactoryBuilder.create().setNamePrefix("globalExecutor_pool_").build(),
                   new ThreadPoolExecutor.CallerRunsPolicy());
           long maxSize = PlatformDependent.maxDirectMemory();
           long limit = maxSize - 1;
           Location location = Location.forGrpcInsecure("0.0.0.0", 8000);
           try (BufferAllocator allocator = new RootAllocator(limit)) {
               try (FlightServer flightServer = FlightServer.builder(allocator, location, new NoOpFlightProducer() {
                   @Override
                   public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
                       FlightDescriptor flightDescriptor = FlightDescriptor.path(
                               new String(ticket.getBytes(), StandardCharsets.UTF_8));
                       String sql = flightDescriptor.getPath().get(0);
   
                       Connection connection = null;
                       PreparedStatement statement = null;
   
                       JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator,
                               JdbcToArrowUtils.getUtcCalendar())
                               .build();
   
                       try {
                           Class<?> driverClass = Class.forName("com.mysql.jdbc.Driver");
                           Driver driver = (Driver) driverClass.newInstance();
                           String jdbcUrl = "jdbc:mysql://{ip}:{port}/test?"
                                   + "useUnicode=true&socketTimeout=1800000&characterEncoding=UTF-8&autoReconnect=true&"
                                   + "useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai";
                           Properties info = new Properties();
                           info.put("user", user);
                           info.put("password", pass);
                           connection = driver.connect(jdbcUrl, info);
                           statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
                                   ResultSet.CONCUR_READ_ONLY);
                           statement.setFetchSize(Integer.MIN_VALUE);
                       } catch (ClassNotFoundException | SQLException | InstantiationException
                               | IllegalAccessException e) {
                           log.error(e.getMessage());
                       }
   
                       int index = 0;
                       VectorSchemaRoot vectorSchemaRoot = null;
                       try {
                           try (ResultSet resultSet = statement.executeQuery();
                                ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator(
                                        resultSet, config)) {
   
                               while (iterator.hasNext() && !listener.isCancelled()) {
                                   if (listener.isReady()) {
                                       try (VectorSchemaRoot root = iterator.next()) {
                                          index++;
                                           if (vectorSchemaRoot == null) {
                                               vectorSchemaRoot = root;
                                               listener.start(vectorSchemaRoot);
                                           }
                                           VectorLoader loader = new VectorLoader(vectorSchemaRoot);
                                           VectorUnloader unloader = new VectorUnloader(root);
   
                                           ArrowRecordBatch arb = unloader.getRecordBatch();
   
                                           loader.load(arb);
                                           listener.putNext();
                                           arb.close();
                                       }
                                       log.info("currentThreadName: {}, index: {}, "
                                                       + "allocator used {}, max {}, direct buffer userd {}",
                                               Thread.currentThread().getName(), index, allocator.getAllocatedMemory(),
                                               allocator.getLimit(), PlatformDependent.usedDirectMemory());
                                   }
                               }
                           }
                       } catch (SQLException | IOException e) {
                           log.error(e.getMessage());
                       } finally {
                           listener.completed();
                           if (vectorSchemaRoot != null) {
                               vectorSchemaRoot.close();
                           }
                       }
                   }
               }).executor(executorService).build()) {
                   flightServer.start();
                   log.info("ArrowFlightApp: Server (Location): Listening on port {}, max buffer size {}",
                           flightServer.getPort(), allocator.getLimit());
                   flightServer.awaitTermination();
               }
           }
       }
   }
   
   ```
   
   
   client code:
   ```
   import numpy as np
   import pyarrow.flight as pf
   import pyarrow as pa
   import time
   import pandas as pd
   from concurrent.futures import ThreadPoolExecutor, as_completed
   
   client = pf.FlightClient("grpc://{ip}:8000")
   
   
   def query(sql):
       ticket = pf.Ticket(str(sql).encode('utf-8'))
       start_time = time.time()
       reader = client.do_get(ticket)
       result = pd.DataFrame()
       for chunk in reader:
           chunk_df = pd.DataFrame()
           for num in range(chunk.data.num_columns):
               if type(chunk.data.column(num - 1)) == pa.Decimal128Array or type(
                       chunk.data.column(num - 1)) == pa.Decimal256Array:
                   tmp_df = chunk.data.column(num - 1).to_pandas().astype(np.float64).to_frame()
               else:
                   tmp_df = chunk.data.column(num - 1).to_pandas().to_frame()
           chunk_df = pd.concat([chunk_df, tmp_df], axis=1)
           result = pd.concat([result, chunk_df], ignore_index=True)
   
       print('convert data use time is : {}'.format(time.time() - start_time))
       return len(result)
   
   
   with ThreadPoolExecutor(max_workers=20) as t:
       obj_list = []
       sql = '''SELECT * FROM test limit 100000'''
       for i in range(100):
           obj = t.submit(query, sql)
           obj_list.append(obj)
   
       j = 0
       for future in as_completed(obj_list):
           j = j + 1
           data = future.result()
           print(j, "Got rows total", data)
       client.close()
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org