You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "engimatic (via GitHub)" <gi...@apache.org> on 2023/06/16 02:08:18 UTC

[GitHub] [arrow] engimatic commented on issue #36043: [Java] Force an outage of the client,server stuck in an endless loop

engimatic commented on issue #36043:
URL: https://github.com/apache/arrow/issues/36043#issuecomment-1593981795

   > Hi @engimatic , I would appreciate it if you could clarify which parts are java code on the client side and which parts are part of the java server so that I could reproduce it in the best way possible. Thank you in advance.
   
   Server code:
   ```
   import cn.hutool.core.thread.ThreadFactoryBuilder;
   import io.netty.util.internal.PlatformDependent;
   import lombok.extern.slf4j.Slf4j;
   import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
   import org.apache.arrow.adapter.jdbc.JdbcToArrow;
   import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
   import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
   import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
   import org.apache.arrow.flight.FlightDescriptor;
   import org.apache.arrow.flight.FlightServer;
   import org.apache.arrow.flight.Location;
   import org.apache.arrow.flight.NoOpFlightProducer;
   import org.apache.arrow.flight.Ticket;
   import org.apache.arrow.memory.BufferAllocator;
   import org.apache.arrow.memory.RootAllocator;
   import org.apache.arrow.vector.VectorLoader;
   import org.apache.arrow.vector.VectorSchemaRoot;
   import org.apache.arrow.vector.VectorUnloader;
   import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
   
   import java.io.IOException;
   import java.nio.charset.StandardCharsets;
   import java.sql.Connection;
   import java.sql.Driver;
   import java.sql.PreparedStatement;
   import java.sql.ResultSet;
   import java.sql.SQLException;
   import java.util.Properties;
   import java.util.concurrent.ArrayBlockingQueue;
   import java.util.concurrent.ExecutorService;
   import java.util.concurrent.ThreadPoolExecutor;
   import java.util.concurrent.TimeUnit;
   
   
   @Slf4j
   public class Server {
       /**
        * main.
        *
        * @param args
        * @throws InterruptedException
        */
       public static void main(String[] args) throws InterruptedException, IOException {
           int corePoolSize = Math.max(Runtime.getRuntime().availableProcessors(), 2) * 2;
           int maxPoolSize = corePoolSize * 2;
           ExecutorService executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 300,
                   TimeUnit.SECONDS, new ArrayBlockingQueue<>(100),
                   ThreadFactoryBuilder.create().setNamePrefix("globalExecutor_pool_").build(),
                   new ThreadPoolExecutor.CallerRunsPolicy());
           long maxSize = PlatformDependent.maxDirectMemory();
           long limit = maxSize - 1;
           Location location = Location.forGrpcInsecure("0.0.0.0", 8000);
           try (BufferAllocator allocator = new RootAllocator(limit)) {
               try (FlightServer flightServer = FlightServer.builder(allocator, location, new NoOpFlightProducer() {
                   @Override
                   public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
                       FlightDescriptor flightDescriptor = FlightDescriptor.path(
                               new String(ticket.getBytes(), StandardCharsets.UTF_8));
                       String sql = flightDescriptor.getPath().get(0);
   
                       Connection connection = null;
                       PreparedStatement statement = null;
   
                       JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator,
                               JdbcToArrowUtils.getUtcCalendar())
                               .build();
   
                       try {
                           Class<?> driverClass = Class.forName("com.mysql.jdbc.Driver");
                           Driver driver = (Driver) driverClass.newInstance();
                           String jdbcUrl = "jdbc:mysql://{ip}:{port}/test?"
                                   + "useUnicode=true&socketTimeout=1800000&characterEncoding=UTF-8&autoReconnect=true&"
                                   + "useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai";
                           Properties info = new Properties();
                           info.put("user", user);
                           info.put("password", pass);
                           connection = driver.connect(jdbcUrl, info);
                           statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
                                   ResultSet.CONCUR_READ_ONLY);
                           statement.setFetchSize(Integer.MIN_VALUE);
                       } catch (ClassNotFoundException | SQLException | InstantiationException
                               | IllegalAccessException e) {
                           log.error(e.getMessage());
                       }
   
                       int index = 0;
                       VectorSchemaRoot vectorSchemaRoot = null;
                       try {
                           try (ResultSet resultSet = statement.executeQuery();
                                ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator(
                                        resultSet, config)) {
   
                               while (iterator.hasNext() && !listener.isCancelled()) {
                                   if (listener.isReady()) {
                                       try (VectorSchemaRoot root = iterator.next()) {
                                          index++;
                                           if (vectorSchemaRoot == null) {
                                               vectorSchemaRoot = root;
                                               listener.start(vectorSchemaRoot);
                                           }
                                           VectorLoader loader = new VectorLoader(vectorSchemaRoot);
                                           VectorUnloader unloader = new VectorUnloader(root);
   
                                           ArrowRecordBatch arb = unloader.getRecordBatch();
   
                                           loader.load(arb);
                                           listener.putNext();
                                           arb.close();
                                       }
                                       log.info("currentThreadName: {}, index: {}, "
                                                       + "allocator used {}, max {}, direct buffer userd {}",
                                               Thread.currentThread().getName(), index, allocator.getAllocatedMemory(),
                                               allocator.getLimit(), PlatformDependent.usedDirectMemory());
                                   }
                               }
                           }
                       } catch (SQLException | IOException e) {
                           log.error(e.getMessage());
                       } finally {
                           listener.completed();
                           if (vectorSchemaRoot != null) {
                               vectorSchemaRoot.close();
                           }
                       }
                   }
               }).executor(executorService).build()) {
                   flightServer.start();
                   log.info("ArrowFlightApp: Server (Location): Listening on port {}, max buffer size {}",
                           flightServer.getPort(), allocator.getLimit());
                   flightServer.awaitTermination();
               }
           }
       }
   }
   
   ```
   
   
   client code:
   ```
   import numpy as np
   import pyarrow.flight as pf
   import pyarrow as pa
   import time
   import pandas as pd
   from concurrent.futures import ThreadPoolExecutor, as_completed
   
   client = pf.FlightClient("grpc://{ip}:8000")
   
   
   def query(sql):
       ticket = pf.Ticket(str(sql).encode('utf-8'))
       start_time = time.time()
       reader = client.do_get(ticket)
       result = pd.DataFrame()
       for chunk in reader:
           chunk_df = pd.DataFrame()
           for num in range(chunk.data.num_columns):
               if type(chunk.data.column(num - 1)) == pa.Decimal128Array or type(
                       chunk.data.column(num - 1)) == pa.Decimal256Array:
                   tmp_df = chunk.data.column(num - 1).to_pandas().astype(np.float64).to_frame()
               else:
                   tmp_df = chunk.data.column(num - 1).to_pandas().to_frame()
           chunk_df = pd.concat([chunk_df, tmp_df], axis=1)
           result = pd.concat([result, chunk_df], ignore_index=True)
   
       print('convert data use time is : {}'.format(time.time() - start_time))
       return len(result)
   
   
   with ThreadPoolExecutor(max_workers=20) as t:
       obj_list = []
       sql = '''SELECT * FROM test limit 100000'''
       for i in range(100):
           obj = t.submit(query, sql)
           obj_list.append(obj)
   
       j = 0
       for future in as_completed(obj_list):
           j = j + 1
           data = future.result()
           print(j, "Got rows total", data)
       client.close()
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org