You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "ianmcook (via GitHub)" <gi...@apache.org> on 2023/12/05 19:04:40 UTC

[PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

ianmcook opened a new pull request, #39081:
URL: https://github.com/apache/arrow/pull/39081

   This PR contains some basic examples of HTTP client and server implementations in several languages, to facilitate the discussion at https://lists.apache.org/thread/vfz74gv1knnhjdkro47shzd1z5g5ggnf.
   
   In these examples, the client makes a GET request to the server, and the server responds with an IPC stream of record batches. Later we also have examples in which the client makes a PUT or POST request to send data to the server.
   
   Currently there is only one server implementation, in Python. To enable performance comparisons to Arrow Flight RPC, the server example generates the data in exactly the same way as in [`flight_benchmark.cc`](https://github.com/apache/arrow/blob/7346bdffbdca36492089f6160534bfa2b81bad90/cpp/src/arrow/flight/flight_benchmark.cc#L194-L245) as cited in the [original blog post introducing Flight RPC](https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/). But note that Flight example sends four concurrent streams.
   
   Currently there are five client examples; I have verified that they all work with the Python server example:
   - Python
   - Python async
   - R
   - JavaScript
   - C++
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1417509757


##########
http/examples/get/client_async.py:
##########
@@ -0,0 +1,39 @@
+import asyncio
+import aiohttp

Review Comment:
   I think this is fetching the HTTP responses (potentially) in parallel - the part under `asyncio.run` -, but then decodes those HTTP responses serially - the regular `for` loop invoking `open_stream` on each response in turn.
   
   We would need to wire `StreamDecoder` into Python and provide a higher-level async IPC read API around it. This is probably off-topic for this.
   
   For now, you could just make the blocking client `client.py` parallel by using [ThreadPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416236766


##########
http/examples/get/client.js:
##########
@@ -0,0 +1,25 @@
+const Arrow = require('apache-arrow');
+
+const url = 'http://localhost:8000';
+
+async function getArrowData(url) {
+  try {
+    const table = await Arrow.tableFromIPC(fetch(url))
+    return table;

Review Comment:
   IIUC, what the Arrow JavaScript library calls a table is not the same as what Arrow C++ calls a Table. In JavaScript, it's made up of an array of record batches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "domoritz (via GitHub)" <gi...@apache.org>.
domoritz commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1425882652


##########
http/examples/get/client.js:
##########
@@ -0,0 +1,25 @@
+const Arrow = require('apache-arrow');
+
+const url = 'http://localhost:8000';
+
+async function getArrowData(url) {
+  try {
+    const table = await Arrow.tableFromIPC(fetch(url))
+    return table;
+  } catch (error) {
+    console.error('Error:', error.message);
+  }
+}
+
+const startTime = new Date();
+
+getArrowData(url)
+  .then(table => {
+    const endTime = new Date();
+    const duration = (endTime - startTime) / 1000;
+    console.log(`${table.batches.length} record batches received`);
+    console.log(`${duration.toFixed(2)} seconds elapsed`);
+  })
+  .catch(error => {
+    console.error('Error:', error.message);
+  });

Review Comment:
   I'd suggest using more modern async here as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "domoritz (via GitHub)" <gi...@apache.org>.
domoritz commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1425882214


##########
http/examples/get/client.js:
##########
@@ -0,0 +1,25 @@
+const Arrow = require('apache-arrow');
+
+const url = 'http://localhost:8000';
+
+async function getArrowData(url) {
+  try {
+    const table = await Arrow.tableFromIPC(fetch(url))
+    return table;

Review Comment:
   Record batches are just virtual. Vectors are chunked and that implies record batches. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1419876330


##########
http/examples/get/client.cpp:
##########


Review Comment:
   Oh, sorry. I'll take a look at this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1433371434


##########
http/examples/get/client.cpp:
##########


Review Comment:
   Oh ok; I'll try it again after that merges. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "zeroshade (via GitHub)" <gi...@apache.org>.
zeroshade commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1448015584


##########
http/examples/get/client.go:
##########
@@ -0,0 +1,53 @@
+package main
+
+import (
+	"fmt"
+	"net/http"
+	"time"
+
+	"github.com/apache/arrow/go/v15/arrow"
+	"github.com/apache/arrow/go/v15/arrow/ipc"
+	"github.com/apache/arrow/go/v15/arrow/memory"
+)
+
+func main() {
+	start := time.Now()
+	resp, err := http.Get("http://localhost:8000")
+	if err != nil {
+		panic(err)
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		panic(fmt.Errorf("got non-200 status: %d", resp.StatusCode))
+	}
+	defer resp.Body.Close()
+
+	rdr, err := ipc.NewReader(resp.Body, ipc.WithAllocator(memory.DefaultAllocator))
+	if err != nil {
+		panic(err)
+	}
+	defer rdr.Release()
+
+	batches := make([]arrow.Record, 0)
+	defer func() {
+		for _, b := range batches {
+			b.Release()
+		}
+	}()
+
+	for rdr.Next() {
+		rec := rdr.Record()
+		rec.Retain()
+		batches = append(batches, rec)
+	}
+
+	if rdr.Err() != nil {
+		panic(rdr.Err())
+	}
+
+	execTime := time.Since(start)
+
+	fmt.Println(resp.ContentLength, " bytes recieved")

Review Comment:
   we can probably wrap the `resp.Body` with a simple counter pass through or a bufio object buffering the bytes allowing us to count them as we pass them to the `ipc` reader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1448015235


##########
http/examples/get/client_async.py:
##########
@@ -0,0 +1,39 @@
+import asyncio
+import aiohttp

Review Comment:
   Thanks — for now I'm just going to remove this async Python example because I don't think it adds value here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1448147161


##########
http/examples/get/client.cpp:
##########


Review Comment:
   Really!? I feel that it's strange but I may be wrong because it works well on your environment...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "paleolimbot (via GitHub)" <gi...@apache.org>.
paleolimbot commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416540078


##########
http/examples/get/client.R:
##########
@@ -0,0 +1,24 @@
+library(httr)
+library(arrow)
+library(tictoc)
+
+url <- 'http://localhost:8000'
+
+tic()
+
+response <- GET(url)
+buffer <- content(response, "raw")
+reader <- RecordBatchStreamReader$create(buffer)
+table <- reader$read_table()
+
+# or:
+#batches <- reader$batches()
+# but this is very slow

Review Comment:
   I reprexed in https://github.com/apache/arrow/issues/39090 and will take a look before 15.0.0. I would personally recommend the `result <- read_ipc_stream('http://localhost:8000', as_data_frame = FALSE)` as best practice (since that should take advantage of any streaming happening by the server).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416845722


##########
http/examples/get/client.cpp:
##########


Review Comment:
   We can use `arrow::ipc::StreamDecoder` for this use case. It provides "push" style API. (`arrow::ipc::RecordBatchStreamReader` provides "pull" style API.)
   
   ```cpp
   #include <curl/curl.h>
   #include <arrow/api.h>
   #include <arrow/io/api.h>
   #include <arrow/ipc/api.h>
   #include <chrono>
   
   static size_t
   WriteFunction(void *contents, size_t size, size_t nmemb, void *userp)
   {
     size_t real_size = size * nmemb;
     auto decoder = static_cast<arrow::ipc::StreamDecoder*>(userp);
     if (decoder->Consume(static_cast<const uint8_t*>(contents), real_size).ok()) {
       return real_size;
     } else {
       return 0;
     }
   }
   
   int main(void)
   {
     std::string url = "http://localhost:8000";
   
     CURL *curl_handle;
     CURLcode res;
   
     // We use arrow::ipc::CollectListner() here for easy to understand
     // but we can process decoded record batches as a stream by
     // overriding arrow::ipc::Listener().
     auto collect_listener = std::make_shared<arrow::ipc::CollectListener>();
     arrow::ipc::StreamDecoder decoder(collect_listener);
   
     curl_global_init(CURL_GLOBAL_ALL);
     curl_handle = curl_easy_init();
   
     curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
     curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, WriteFunction);
     curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, &decoder);
    
     auto start_time = std::chrono::steady_clock::now();
   
     res = curl_easy_perform(curl_handle);
   
     printf("%ld record batches received\n", collect_listener->num_record_batches());
   
     auto end_time = std::chrono::steady_clock::now();
   
     auto time_duration = std::chrono::duration_cast<std::chrono::duration<double>>(end_time - start_time);
     printf("%.2f seconds elapsed\n", time_duration.count());
   
     curl_easy_cleanup(curl_handle);
    
     curl_global_cleanup();
    
     return 0;
   }
   
   // to compile (for example):
   //clang++ client.cpp -std=c++17 -I/usr/local/include -L/usr/local/lib -lcurl -larrow -o client
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1419212556


##########
http/examples/get/client.cpp:
##########


Review Comment:
   @kou I can't get this `CollectListener()` client example to work. It sends a `SIGPIPE` signal to the server. Can you help to debug? Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1432944741


##########
http/examples/get/server.go:
##########
@@ -0,0 +1,98 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	"math/rand"
+	"net/http"
+
+	"github.com/apache/arrow/go/v15/arrow"
+	"github.com/apache/arrow/go/v15/arrow/array"
+	"github.com/apache/arrow/go/v15/arrow/ipc"
+	"github.com/apache/arrow/go/v15/arrow/memory"
+)
+
+var schema = arrow.NewSchema([]arrow.Field{
+	{Name: "a", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "b", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "c", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "d", Type: arrow.PrimitiveTypes.Int64},
+}, nil)
+
+func GetPutData() []arrow.Record {
+	const (
+		totalRecords = 100000000
+		length       = 4096
+		ncolumns     = 4
+		seed         = 42
+	)
+
+	var (
+		r    = rand.New(rand.NewSource(seed))
+		mem  = memory.DefaultAllocator
+		arrs = make([]arrow.Array, 0, ncolumns)
+	)
+	for i := 0; i < ncolumns; i++ {
+		buf := memory.NewResizableBuffer(mem)
+		buf.Resize(length * 8)
+		_, err := r.Read(buf.Buf())
+		if err != nil {
+			panic(err)
+		}
+		defer buf.Release()
+
+		data := array.NewData(arrow.PrimitiveTypes.Int64, length, []*memory.Buffer{nil, buf}, nil, 0, 0)
+		defer data.Release()
+		a := array.NewInt64Data(data)
+		defer a.Release()
+		arrs = append(arrs, a)
+	}
+
+	batch := array.NewRecord(schema, arrs, length)
+	defer batch.Release()
+
+	batches := make([]arrow.Record, 0)
+	records := 0
+	for records < totalRecords {
+		if records+length > totalRecords {
+			lastLen := totalRecords - records
+			batches = append(batches, batch.NewSlice(0, lastLen))
+			records += lastLen
+		} else {
+			batch.Retain()
+			batches = append(batches, batch)

Review Comment:
   ```suggestion
   			batches = append(batches, batch)
   			records += length
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1433236932


##########
http/examples/get/client.js:
##########
@@ -0,0 +1,29 @@
+const Arrow = require('apache-arrow');
+
+const url = 'http://localhost:8000';
+
+async function getArrowData(url) {
+  try {
+    const response = await fetch(url);
+    const table = await Arrow.tableFromIPC(response);
+    return table;
+  } catch (error) {
+    console.error('Error:', error.message);
+    throw error;
+  }
+}
+
+async function runExample(url) {
+  const startTime = new Date();
+  try {
+    const table = await getArrowData(url);
+    const endTime = new Date();
+    const duration = (endTime - startTime) / 1000;
+    console.log(`${table.batches.length} record batches received`);
+    console.log(`${duration.toFixed(2)} seconds elapsed`);
+  } catch (error) {
+    console.error('Error:', error.message);
+  }
+}

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1448145304


##########
http/examples/get/client.cpp:
##########


Review Comment:
   Hmm, I'm able to add this at the very bottom of `main` in your first example,  and it works without error:
   ```cpp
     std::vector<std::shared_ptr<arrow::RecordBatch>> record_batches;
     record_batches = collect_listener->record_batches();
     std::cout << "Schema: " << std::endl << record_batches[0]->schema()->ToString() << std::endl;
     std::cout << "RecordBatch: " << std::endl << record_batches[0]->ToString() << std::endl;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1449086865


##########
http/examples/get/client.go:
##########
@@ -0,0 +1,53 @@
+package main
+
+import (
+	"fmt"
+	"net/http"
+	"time"
+
+	"github.com/apache/arrow/go/v15/arrow"
+	"github.com/apache/arrow/go/v15/arrow/ipc"
+	"github.com/apache/arrow/go/v15/arrow/memory"
+)
+
+func main() {
+	start := time.Now()
+	resp, err := http.Get("http://localhost:8000")
+	if err != nil {
+		panic(err)
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		panic(fmt.Errorf("got non-200 status: %d", resp.StatusCode))
+	}
+	defer resp.Body.Close()
+
+	rdr, err := ipc.NewReader(resp.Body, ipc.WithAllocator(memory.DefaultAllocator))
+	if err != nil {
+		panic(err)
+	}
+	defer rdr.Release()
+
+	batches := make([]arrow.Record, 0)
+	defer func() {
+		for _, b := range batches {
+			b.Release()
+		}
+	}()
+
+	for rdr.Next() {
+		rec := rdr.Record()
+		rec.Retain()
+		batches = append(batches, rec)
+	}
+
+	if rdr.Err() != nil {
+		panic(rdr.Err())
+	}
+
+	execTime := time.Since(start)
+
+	fmt.Println(resp.ContentLength, " bytes recieved")

Review Comment:
   Ok thanks; for now I'll just remove it for simplicity, since most of the other client examples do not print the size of received bytes (due to the added complication that would be needed).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416847365


##########
http/examples/get/client.cpp:
##########
@@ -0,0 +1,86 @@
+#include <curl/curl.h>
+#include <arrow/api.h>
+#include <arrow/io/api.h>
+#include <arrow/ipc/api.h>
+#include <chrono>
+
+struct MemoryStruct {
+  char *memory;
+  size_t size;
+};
+ 
+static size_t
+WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp)
+{
+  size_t realsize = size * nmemb;
+  struct MemoryStruct *mem = (struct MemoryStruct *)userp;
+ 
+  char *ptr = static_cast<char*>(realloc(mem->memory, mem->size + realsize + 1));
+  if(!ptr) {
+    printf("out of memory\n");
+    return 0;
+  }
+ 
+  mem->memory = ptr;
+  memcpy(&(mem->memory[mem->size]), contents, realsize);
+  mem->size += realsize;
+  mem->memory[mem->size] = 0;
+ 
+  return realsize;
+}
+
+int main(void)
+{
+  std::string url = "http://localhost:8000";
+
+  CURL *curl_handle;
+  CURLcode res;
+ 
+  struct MemoryStruct chunk;
+ 
+  chunk.memory = static_cast<char*>(malloc(1));
+  chunk.size = 0;
+
+  curl_global_init(CURL_GLOBAL_ALL);
+  curl_handle = curl_easy_init();
+
+  curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
+  curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
+  curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)&chunk);
+ 
+  auto start_time = std::chrono::steady_clock::now();
+
+  res = curl_easy_perform(curl_handle);
+
+  printf("%lu bytes received\n", (unsigned long)chunk.size);
+
+  auto buffer = arrow::Buffer::Wrap(reinterpret_cast<const uint8_t *>(chunk.memory), chunk.size);
+  auto input_stream = std::make_shared<arrow::io::BufferReader>(buffer);
+  auto record_batch_reader = arrow::ipc::RecordBatchStreamReader::Open(input_stream).ValueOrDie();
+
+  std::vector<std::shared_ptr<arrow::RecordBatch>> record_batches;
+  std::shared_ptr<arrow::RecordBatch> record_batch;
+  while (record_batch_reader->ReadNext(&record_batch).ok() && record_batch)
+  {
+      record_batches.push_back(record_batch);
+  }
+
+  printf("%lu record batches received\n", (unsigned long)(record_batches.size()));
+
+  auto end_time = std::chrono::steady_clock::now();
+
+  auto time_duration = std::chrono::duration_cast<std::chrono::duration<double>>(end_time - start_time);
+  printf("%.2f seconds elapsed\n", time_duration.count());
+
+  curl_easy_cleanup(curl_handle);
+ 
+  free(chunk.memory);
+
+  curl_global_cleanup();
+ 
+  return 0;
+}
+
+// to compile (for example):
+//clang++ client.cpp -std=c++17 -I/usr/local/include -L/usr/local/lib -lcurl -larrow -o client

Review Comment:
   We can use `pkg-config`:
   
   ```suggestion
   //clang++ client.cpp -std=c++17 $(pkg-config --cflags --libs arrow libcurl) -o client
   ```



##########
http/examples/get/client.cpp:
##########


Review Comment:
   We can use `arrow::ipc::StreamDecoder` for this use case. It provides "push" style API. (`arrow::ipc::RecordBatchStreamReader` provides "pull" style API.")
   
   ```cpp
   #include <curl/curl.h>
   #include <arrow/api.h>
   #include <arrow/io/api.h>
   #include <arrow/ipc/api.h>
   #include <chrono>
   
   static size_t
   WriteFunction(void *contents, size_t size, size_t nmemb, void *userp)
   {
     size_t real_size = size * nmemb;
     auto decoder = static_cast<arrow::ipc::StreamDecoder*>(userp);
     if (decoder->Consume(static_cast<const uint8_t*>(contents), real_size).ok()) {
       return real_size;
     } else {
       return 0;
     }
   }
   
   int main(void)
   {
     std::string url = "http://localhost:8000";
   
     CURL *curl_handle;
     CURLcode res;
   
     // We use arrow::ipc::CollectListner() here for easy to understand
     // but we can process decoded record batches as a stream by
     // overriding arrow::ipc::Listener().
     auto collect_listener = std::make_shared<arrow::ipc::CollectListener>();
     arrow::ipc::StreamDecoder decoder(collect_listener);
   
     curl_global_init(CURL_GLOBAL_ALL);
     curl_handle = curl_easy_init();
   
     curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
     curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, WriteFunction);
     curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, &decoder);
    
     auto start_time = std::chrono::steady_clock::now();
   
     res = curl_easy_perform(curl_handle);
   
     printf("%ld record batches received\n", collect_listener->num_record_batches());
   
     auto end_time = std::chrono::steady_clock::now();
   
     auto time_duration = std::chrono::duration_cast<std::chrono::duration<double>>(end_time - start_time);
     printf("%.2f seconds elapsed\n", time_duration.count());
   
     curl_easy_cleanup(curl_handle);
    
     curl_global_cleanup();
    
     return 0;
   }
   
   // to compile (for example):
   //clang++ client.cpp -std=c++17 -I/usr/local/include -L/usr/local/lib -lcurl -larrow -o client
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #39081:
URL: https://github.com/apache/arrow/pull/39081#issuecomment-1841457160

   <!--
     Licensed to the Apache Software Foundation (ASF) under one
     or more contributor license agreements.  See the NOTICE file
     distributed with this work for additional information
     regarding copyright ownership.  The ASF licenses this file
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
     with the License.  You may obtain a copy of the License at
   
       http://www.apache.org/licenses/LICENSE-2.0
   
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     KIND, either express or implied.  See the License for the
     specific language governing permissions and limitations
     under the License.
   -->
   
   Thanks for opening a pull request!
   
   If this is not a [minor PR](https://github.com/apache/arrow/blob/main/CONTRIBUTING.md#Minor-Fixes). Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose
   
   Opening GitHub issues ahead of time contributes to the [Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.) of the Apache Arrow project.
   
   Then could you also rename the pull request title in the following format?
   
       GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   or
   
       MINOR: [${COMPONENT}] ${SUMMARY}
   
   In the case of PARQUET issues on JIRA the title also supports:
   
       PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   See also:
   
     * [Other pull requests](https://github.com/apache/arrow/pulls/)
     * [Contribution Guidelines - How to contribute patches](https://arrow.apache.org/docs/developers/contributing.html#how-to-contribute-patches)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #39081:
URL: https://github.com/apache/arrow/pull/39081#issuecomment-1841450018

   <!--
     Licensed to the Apache Software Foundation (ASF) under one
     or more contributor license agreements.  See the NOTICE file
     distributed with this work for additional information
     regarding copyright ownership.  The ASF licenses this file
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
     with the License.  You may obtain a copy of the License at
   
       http://www.apache.org/licenses/LICENSE-2.0
   
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     KIND, either express or implied.  See the License for the
     specific language governing permissions and limitations
     under the License.
   -->
   
   Thanks for opening a pull request!
   
   If this is not a [minor PR](https://github.com/apache/arrow/blob/main/CONTRIBUTING.md#Minor-Fixes). Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose
   
   Opening GitHub issues ahead of time contributes to the [Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.) of the Apache Arrow project.
   
   Then could you also rename the pull request title in the following format?
   
       GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   or
   
       MINOR: [${COMPONENT}] ${SUMMARY}
   
   In the case of PARQUET issues on JIRA the title also supports:
   
       PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   See also:
   
     * [Other pull requests](https://github.com/apache/arrow/pulls/)
     * [Contribution Guidelines - How to contribute patches](https://arrow.apache.org/docs/developers/contributing.html#how-to-contribute-patches)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416458235


##########
http/examples/get/client_async.py:
##########
@@ -0,0 +1,39 @@
+import asyncio
+import aiohttp

Review Comment:
   This was an experiment to compare the throughput when doing multiple GET requests in parallel. But this example uses much more memory than I expected and runs very slowly. Anyone know what I'm doing wrong here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416584067


##########
http/examples/get/server.py:
##########
@@ -0,0 +1,75 @@
+import pyarrow as pa
+from random import randbytes
+from http.server import BaseHTTPRequestHandler, HTTPServer
+
+schema = pa.schema([
+    ('a', pa.int64()),
+    ('b', pa.int64()),
+    ('c', pa.int64()),
+    ('d', pa.int64())
+])
+
+def GetPutData():
+    total_records = 100000000
+    length = 4096
+    ncolumns = 4
+    
+    arrays = []
+    
+    for x in range(0, ncolumns):
+        buffer = pa.py_buffer(randbytes(length * 8))
+        arrays.append(pa.Int64Array.from_buffers(pa.int64(), length, [None, buffer], null_count=0))
+    
+    batch = pa.record_batch(arrays, schema)
+    batches = []
+    
+    records_sent = 0
+    while records_sent < total_records:
+      if records_sent + length > total_records:
+        last_length = total_records - records_sent
+        batches.append(batch.slice(0, last_length))
+        records_sent += last_length
+      else:
+        batches.append(batch)
+        records_sent += length
+    
+    return batches
+ 
+class MyServer(BaseHTTPRequestHandler):
+    def do_GET(self):
+        chunk_size = int(2e9)
+        chunk_splits = len(buffer) // chunk_size
+        
+        self.send_response(200)
+        self.send_header('Content-Type', 'application/vnd.apache.arrow.stream')
+        
+        #######################################################################
+        # include these to enable testing JavaScript client in local browser
+        self.send_header('Access-Control-Allow-Origin', 'http://localhost:8080')
+        self.send_header('Access-Control-Allow-Methods', 'GET')
+        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
+        #######################################################################
+        
+        self.end_headers()
+        
+        for i in range(chunk_splits):
+            self.wfile.write(buffer.slice(i * chunk_size, chunk_size))
+            self.wfile.flush()
+        self.wfile.write(buffer.slice(chunk_splits * chunk_size))
+        self.wfile.flush()
+
+batches = GetPutData()
+
+sink = pa.BufferOutputStream()
+
+with pa.ipc.new_stream(sink, schema) as writer:
+   for i in range(len(batches)):
+      writer.write_batch(batches[i])
+
+buffer = sink.getvalue()

Review Comment:
   I added `server_low_mem.py` which is the same as `server.py` but uses this trick.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1421771731


##########
http/examples/get/client.cpp:
##########


Review Comment:
   This is a bug...: #39163



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1447958992


##########
http/examples/get/client.cpp:
##########


Review Comment:
   Confirmed these examples work with the change in #39164 — thanks!
   
   I'm going to use the first example that you provided on 2023-12-06 because it is slightly simpler (it does not override the listener class) and because it collects all the RecordBatches (which is more representative of the real-world use cases we anticipate). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1448147394


##########
http/examples/get/client.cpp:
##########
@@ -0,0 +1,86 @@
+#include <curl/curl.h>
+#include <arrow/api.h>
+#include <arrow/io/api.h>
+#include <arrow/ipc/api.h>
+#include <chrono>
+
+struct MemoryStruct {
+  char *memory;
+  size_t size;
+};
+ 
+static size_t
+WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp)
+{
+  size_t realsize = size * nmemb;
+  struct MemoryStruct *mem = (struct MemoryStruct *)userp;
+ 
+  char *ptr = static_cast<char*>(realloc(mem->memory, mem->size + realsize + 1));
+  if(!ptr) {
+    printf("out of memory\n");
+    return 0;
+  }
+ 
+  mem->memory = ptr;
+  memcpy(&(mem->memory[mem->size]), contents, realsize);
+  mem->size += realsize;
+  mem->memory[mem->size] = 0;
+ 
+  return realsize;
+}
+
+int main(void)
+{
+  std::string url = "http://localhost:8000";
+
+  CURL *curl_handle;
+  CURLcode res;
+ 
+  struct MemoryStruct chunk;
+ 
+  chunk.memory = static_cast<char*>(malloc(1));
+  chunk.size = 0;
+
+  curl_global_init(CURL_GLOBAL_ALL);
+  curl_handle = curl_easy_init();
+
+  curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
+  curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
+  curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)&chunk);
+ 
+  auto start_time = std::chrono::steady_clock::now();
+
+  res = curl_easy_perform(curl_handle);
+
+  printf("%lu bytes received\n", (unsigned long)chunk.size);
+
+  auto buffer = arrow::Buffer::Wrap(reinterpret_cast<const uint8_t *>(chunk.memory), chunk.size);
+  auto input_stream = std::make_shared<arrow::io::BufferReader>(buffer);
+  auto record_batch_reader = arrow::ipc::RecordBatchStreamReader::Open(input_stream).ValueOrDie();
+
+  std::vector<std::shared_ptr<arrow::RecordBatch>> record_batches;
+  std::shared_ptr<arrow::RecordBatch> record_batch;
+  while (record_batch_reader->ReadNext(&record_batch).ok() && record_batch)
+  {
+      record_batches.push_back(record_batch);
+  }
+
+  printf("%lu record batches received\n", (unsigned long)(record_batches.size()));
+
+  auto end_time = std::chrono::steady_clock::now();
+
+  auto time_duration = std::chrono::duration_cast<std::chrono::duration<double>>(end_time - start_time);
+  printf("%.2f seconds elapsed\n", time_duration.count());
+
+  curl_easy_cleanup(curl_handle);
+ 
+  free(chunk.memory);
+
+  curl_global_cleanup();
+ 
+  return 0;
+}
+
+// to compile (for example):
+//clang++ client.cpp -std=c++17 -I/usr/local/include -L/usr/local/lib -lcurl -larrow -o client

Review Comment:
   We may want to re-apply this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1447965336


##########
http/examples/get/client.cpp:
##########


Review Comment:
   Comment for posterity about the example that overrides `arrow:ipc::Listener`: 
   
   `arrow::RecordBatchWithMetadata` is just a struct, defined here: https://github.com/apache/arrow/blob/07a46555e74501f96973dc43ef54a4669d261876/cpp/src/arrow/record_batch.h#L233-L236



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1448213427


##########
http/examples/get/client.cpp:
##########


Review Comment:
   I see that `CollectListener` is defined at https://github.com/apache/arrow/blob/main/cpp/src/arrow/ipc/reader.h#L318 but I am not knowledgeable enough to know if this should work or not based on the implementation there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1433108979


##########
http/examples/get/client.js:
##########
@@ -0,0 +1,29 @@
+const Arrow = require('apache-arrow');
+
+const url = 'http://localhost:8000';
+
+async function getArrowData(url) {
+  try {
+    const response = await fetch(url);
+    const table = await Arrow.tableFromIPC(response);
+    return table;
+  } catch (error) {
+    console.error('Error:', error.message);
+    throw error;
+  }
+}
+
+async function runExample(url) {
+  const startTime = new Date();
+  try {
+    const table = await getArrowData(url);
+    const endTime = new Date();
+    const duration = (endTime - startTime) / 1000;
+    console.log(`${table.batches.length} record batches received`);
+    console.log(`${duration.toFixed(2)} seconds elapsed`);
+  } catch (error) {
+    console.error('Error:', error.message);
+  }
+}
+
+runExample(url);

Review Comment:
   @domoritz I cleaned this up to use all async. LMK if there's anything else that could be improved here. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #39081:
URL: https://github.com/apache/arrow/pull/39081#issuecomment-1841461690

   <!--
     Licensed to the Apache Software Foundation (ASF) under one
     or more contributor license agreements.  See the NOTICE file
     distributed with this work for additional information
     regarding copyright ownership.  The ASF licenses this file
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
     with the License.  You may obtain a copy of the License at
   
       http://www.apache.org/licenses/LICENSE-2.0
   
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     KIND, either express or implied.  See the License for the
     specific language governing permissions and limitations
     under the License.
   -->
   
   Thanks for opening a pull request!
   
   If this is not a [minor PR](https://github.com/apache/arrow/blob/main/CONTRIBUTING.md#Minor-Fixes). Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose
   
   Opening GitHub issues ahead of time contributes to the [Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.) of the Apache Arrow project.
   
   Then could you also rename the pull request title in the following format?
   
       GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   or
   
       MINOR: [${COMPONENT}] ${SUMMARY}
   
   In the case of PARQUET issues on JIRA the title also supports:
   
       PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   See also:
   
     * [Other pull requests](https://github.com/apache/arrow/pulls/)
     * [Contribution Guidelines - How to contribute patches](https://arrow.apache.org/docs/developers/contributing.html#how-to-contribute-patches)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416533939


##########
http/examples/get/server.py:
##########
@@ -0,0 +1,75 @@
+import pyarrow as pa
+from random import randbytes
+from http.server import BaseHTTPRequestHandler, HTTPServer
+
+schema = pa.schema([
+    ('a', pa.int64()),
+    ('b', pa.int64()),
+    ('c', pa.int64()),
+    ('d', pa.int64())
+])
+
+def GetPutData():
+    total_records = 100000000
+    length = 4096
+    ncolumns = 4
+    
+    arrays = []
+    
+    for x in range(0, ncolumns):
+        buffer = pa.py_buffer(randbytes(length * 8))
+        arrays.append(pa.Int64Array.from_buffers(pa.int64(), length, [None, buffer], null_count=0))
+    
+    batch = pa.record_batch(arrays, schema)
+    batches = []
+    
+    records_sent = 0
+    while records_sent < total_records:
+      if records_sent + length > total_records:
+        last_length = total_records - records_sent
+        batches.append(batch.slice(0, last_length))
+        records_sent += last_length
+      else:
+        batches.append(batch)
+        records_sent += length
+    
+    return batches
+ 
+class MyServer(BaseHTTPRequestHandler):
+    def do_GET(self):
+        chunk_size = int(2e9)
+        chunk_splits = len(buffer) // chunk_size
+        
+        self.send_response(200)
+        self.send_header('Content-Type', 'application/vnd.apache.arrow.stream')
+        
+        #######################################################################
+        # include these to enable testing JavaScript client in local browser
+        self.send_header('Access-Control-Allow-Origin', 'http://localhost:8080')
+        self.send_header('Access-Control-Allow-Methods', 'GET')
+        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
+        #######################################################################
+        
+        self.end_headers()
+        
+        for i in range(chunk_splits):
+            self.wfile.write(buffer.slice(i * chunk_size, chunk_size))
+            self.wfile.flush()
+        self.wfile.write(buffer.slice(chunk_splits * chunk_size))
+        self.wfile.flush()

Review Comment:
   Thanks. I think this `buffer.slice()` method does not create copies. If I set `total_records` to a smaller size (so that the buffer is under 2 GB), there is no difference in memory use when I use `self.wfile.write()` to send the whole buffer at once vs. slices of the buffer with various `chunk_size`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416556643


##########
http/examples/get/server.py:
##########
@@ -0,0 +1,75 @@
+import pyarrow as pa
+from random import randbytes
+from http.server import BaseHTTPRequestHandler, HTTPServer
+
+schema = pa.schema([
+    ('a', pa.int64()),
+    ('b', pa.int64()),
+    ('c', pa.int64()),
+    ('d', pa.int64())
+])
+
+def GetPutData():
+    total_records = 100000000
+    length = 4096
+    ncolumns = 4
+    
+    arrays = []
+    
+    for x in range(0, ncolumns):
+        buffer = pa.py_buffer(randbytes(length * 8))
+        arrays.append(pa.Int64Array.from_buffers(pa.int64(), length, [None, buffer], null_count=0))
+    
+    batch = pa.record_batch(arrays, schema)
+    batches = []
+    
+    records_sent = 0
+    while records_sent < total_records:
+      if records_sent + length > total_records:
+        last_length = total_records - records_sent
+        batches.append(batch.slice(0, last_length))
+        records_sent += last_length
+      else:
+        batches.append(batch)
+        records_sent += length
+    
+    return batches
+ 
+class MyServer(BaseHTTPRequestHandler):
+    def do_GET(self):
+        chunk_size = int(2e9)
+        chunk_splits = len(buffer) // chunk_size
+        
+        self.send_response(200)
+        self.send_header('Content-Type', 'application/vnd.apache.arrow.stream')
+        
+        #######################################################################
+        # include these to enable testing JavaScript client in local browser
+        self.send_header('Access-Control-Allow-Origin', 'http://localhost:8080')
+        self.send_header('Access-Control-Allow-Methods', 'GET')
+        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
+        #######################################################################
+        
+        self.end_headers()
+        
+        for i in range(chunk_splits):
+            self.wfile.write(buffer.slice(i * chunk_size, chunk_size))
+            self.wfile.flush()
+        self.wfile.write(buffer.slice(chunk_splits * chunk_size))
+        self.wfile.flush()
+
+batches = GetPutData()
+
+sink = pa.BufferOutputStream()
+
+with pa.ipc.new_stream(sink, schema) as writer:
+   for i in range(len(batches)):
+      writer.write_batch(batches[i])
+
+buffer = sink.getvalue()

Review Comment:
   I think I finally wrapped my head around what your code there does. I'll create an alternative version that uses that trick and see what the memory usage looks like.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1432959936


##########
http/examples/get/server.go:
##########
@@ -0,0 +1,98 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	"math/rand"
+	"net/http"
+
+	"github.com/apache/arrow/go/v15/arrow"
+	"github.com/apache/arrow/go/v15/arrow/array"
+	"github.com/apache/arrow/go/v15/arrow/ipc"
+	"github.com/apache/arrow/go/v15/arrow/memory"
+)
+
+var schema = arrow.NewSchema([]arrow.Field{
+	{Name: "a", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "b", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "c", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "d", Type: arrow.PrimitiveTypes.Int64},
+}, nil)
+
+func GetPutData() []arrow.Record {
+	const (
+		totalRecords = 100000000
+		length       = 4096
+		ncolumns     = 4
+		seed         = 42
+	)
+
+	var (
+		r    = rand.New(rand.NewSource(seed))
+		mem  = memory.DefaultAllocator
+		arrs = make([]arrow.Array, 0, ncolumns)
+	)
+	for i := 0; i < ncolumns; i++ {
+		buf := memory.NewResizableBuffer(mem)
+		buf.Resize(length * 8)
+		_, err := r.Read(buf.Buf())
+		if err != nil {
+			panic(err)
+		}
+		defer buf.Release()
+
+		data := array.NewData(arrow.PrimitiveTypes.Int64, length, []*memory.Buffer{nil, buf}, nil, 0, 0)
+		defer data.Release()
+		a := array.NewInt64Data(data)
+		defer a.Release()
+		arrs = append(arrs, a)
+	}
+
+	batch := array.NewRecord(schema, arrs, length)
+	defer batch.Release()
+
+	batches := make([]arrow.Record, 0)
+	records := 0
+	for records < totalRecords {
+		if records+length > totalRecords {
+			lastLen := totalRecords - records
+			batches = append(batches, batch.NewSlice(0, lastLen))
+			records += lastLen
+		} else {
+			batch.Retain()
+			batches = append(batches, batch)
+		}
+	}
+
+	return batches
+}
+
+func main() {
+	batches := GetPutData()
+
+	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
+		hdrs := w.Header()
+		hdrs.Add("access-control-allow-origin", "http://localhost:8080")

Review Comment:
   ```suggestion
   		hdrs.Add("access-control-allow-origin", "http://localhost:8000")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1433126564


##########
http/examples/get/client.cpp:
##########


Review Comment:
   @kou when I run your last example, I get `IOError: Invalid flatbuffers message` in `Consume`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1433029805


##########
http/examples/get/server.go:
##########
@@ -0,0 +1,98 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	"math/rand"
+	"net/http"
+
+	"github.com/apache/arrow/go/v15/arrow"
+	"github.com/apache/arrow/go/v15/arrow/array"
+	"github.com/apache/arrow/go/v15/arrow/ipc"
+	"github.com/apache/arrow/go/v15/arrow/memory"
+)
+
+var schema = arrow.NewSchema([]arrow.Field{
+	{Name: "a", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "b", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "c", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "d", Type: arrow.PrimitiveTypes.Int64},
+}, nil)
+
+func GetPutData() []arrow.Record {
+	const (
+		totalRecords = 100000000
+		length       = 4096
+		ncolumns     = 4
+		seed         = 42
+	)
+
+	var (
+		r    = rand.New(rand.NewSource(seed))
+		mem  = memory.DefaultAllocator
+		arrs = make([]arrow.Array, 0, ncolumns)
+	)
+	for i := 0; i < ncolumns; i++ {
+		buf := memory.NewResizableBuffer(mem)
+		buf.Resize(length * 8)
+		_, err := r.Read(buf.Buf())
+		if err != nil {
+			panic(err)
+		}
+		defer buf.Release()
+
+		data := array.NewData(arrow.PrimitiveTypes.Int64, length, []*memory.Buffer{nil, buf}, nil, 0, 0)
+		defer data.Release()
+		a := array.NewInt64Data(data)
+		defer a.Release()
+		arrs = append(arrs, a)
+	}
+
+	batch := array.NewRecord(schema, arrs, length)
+	defer batch.Release()
+
+	batches := make([]arrow.Record, 0)
+	records := 0

Review Comment:
   Fix type mismatch in Go server



##########
http/examples/get/server.go:
##########
@@ -0,0 +1,98 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	"math/rand"
+	"net/http"
+
+	"github.com/apache/arrow/go/v15/arrow"
+	"github.com/apache/arrow/go/v15/arrow/array"
+	"github.com/apache/arrow/go/v15/arrow/ipc"
+	"github.com/apache/arrow/go/v15/arrow/memory"
+)
+
+var schema = arrow.NewSchema([]arrow.Field{
+	{Name: "a", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "b", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "c", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "d", Type: arrow.PrimitiveTypes.Int64},
+}, nil)
+
+func GetPutData() []arrow.Record {
+	const (
+		totalRecords = 100000000
+		length       = 4096
+		ncolumns     = 4
+		seed         = 42
+	)
+
+	var (
+		r    = rand.New(rand.NewSource(seed))
+		mem  = memory.DefaultAllocator
+		arrs = make([]arrow.Array, 0, ncolumns)
+	)
+	for i := 0; i < ncolumns; i++ {
+		buf := memory.NewResizableBuffer(mem)
+		buf.Resize(length * 8)
+		_, err := r.Read(buf.Buf())
+		if err != nil {
+			panic(err)
+		}
+		defer buf.Release()
+
+		data := array.NewData(arrow.PrimitiveTypes.Int64, length, []*memory.Buffer{nil, buf}, nil, 0, 0)
+		defer data.Release()
+		a := array.NewInt64Data(data)
+		defer a.Release()
+		arrs = append(arrs, a)
+	}
+
+	batch := array.NewRecord(schema, arrs, length)
+	defer batch.Release()
+
+	batches := make([]arrow.Record, 0)
+	records := 0

Review Comment:
   Fix type mismatch in Go server



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1449091092


##########
http/examples/get/server.py:
##########
@@ -0,0 +1,92 @@
+import pyarrow as pa
+from random import randbytes
+from http.server import BaseHTTPRequestHandler, HTTPServer
+import io
+
+schema = pa.schema([
+    ('a', pa.int64()),
+    ('b', pa.int64()),
+    ('c', pa.int64()),
+    ('d', pa.int64())
+])
+
+def GetPutData():
+    total_records = 100000000
+    length = 4096
+    ncolumns = 4
+    
+    arrays = []
+    
+    for x in range(0, ncolumns):
+        buffer = pa.py_buffer(randbytes(length * 8))
+        arrays.append(pa.Int64Array.from_buffers(pa.int64(), length, [None, buffer], null_count=0))
+    
+    batch = pa.record_batch(arrays, schema)
+    batches = []
+    
+    records_sent = 0
+    while records_sent < total_records:
+      if records_sent + length > total_records:
+        last_length = total_records - records_sent
+        batches.append(batch.slice(0, last_length))
+        records_sent += last_length
+      else:
+        batches.append(batch)
+        records_sent += length
+    
+    return batches
+
+def make_reader(schema, batches):
+    return pa.RecordBatchReader.from_batches(schema, batches)
+
+def generate_batches(schema, reader):
+    with io.BytesIO() as sink, pa.ipc.new_stream(sink, schema) as writer:
+        yield sink.getvalue()
+        
+        for batch in reader:
+            sink.seek(0)
+            sink.truncate(0)
+            writer.write_batch(batch)
+            yield sink.getvalue()
+        
+        sink.seek(0)
+        sink.truncate(0)
+        writer.close()
+        yield sink.getvalue()
+ 
+class MyServer(BaseHTTPRequestHandler):
+    def do_GET(self):
+        self.send_response(200)
+        self.send_header('Content-Type', 'application/vnd.apache.arrow.stream')
+        
+        # set these headers if testing with a local browser-based client:
+        
+        #self.send_header('Access-Control-Allow-Origin', 'http://localhost:8000')
+        #self.send_header('Access-Control-Allow-Methods', 'GET')
+        #self.send_header('Access-Control-Allow-Headers', 'Content-Type')
+        
+        self.end_headers()
+        
+        for buffer in generate_batches(schema, make_reader(schema, batches)):
+            self.wfile.write(buffer)
+            self.wfile.flush()
+            
+            # if any record batch could be larger than 2 GB, split it
+            # into chunks before passing to self.wfile.write() by 
+            # replacing the two lines above with this:

Review Comment:
   Comment for posterity: The Go server example here does _not_ error if a batch is larger than 2 GB. This issue is specific to the Python http.server implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1448450404


##########
http/examples/get/client.cpp:
##########


Review Comment:
   It should be crashed. But it's not crashed on my environment. (I try this now.)
   
   It seems that this usage is safe.
   
   FYI: https://github.com/apache/arrow/blob/30c4e157a920981a853352ea2c24473496c7e595/cpp/src/arrow/ipc/message.cc#L647 doesn't copy the given data (`contents` for `WriteFunction()`). So a decoded record batch that refers the given data is invalid when the given data is invalid. I think that curl invalidates the given data immediately after `WriteFunction()` call but it seems that the given data is still valid after `curl_easy_perform()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416270794


##########
http/examples/get/server.py:
##########
@@ -0,0 +1,75 @@
+import pyarrow as pa
+from random import randbytes
+from http.server import BaseHTTPRequestHandler, HTTPServer
+
+schema = pa.schema([
+    ('a', pa.int64()),
+    ('b', pa.int64()),
+    ('c', pa.int64()),
+    ('d', pa.int64())
+])
+
+def GetPutData():
+    total_records = 100000000
+    length = 4096
+    ncolumns = 4
+    
+    arrays = []
+    
+    for x in range(0, ncolumns):
+        buffer = pa.py_buffer(randbytes(length * 8))
+        arrays.append(pa.Int64Array.from_buffers(pa.int64(), length, [None, buffer], null_count=0))
+    
+    batch = pa.record_batch(arrays, schema)
+    batches = []
+    
+    records_sent = 0
+    while records_sent < total_records:
+      if records_sent + length > total_records:
+        last_length = total_records - records_sent
+        batches.append(batch.slice(0, last_length))
+        records_sent += last_length
+      else:
+        batches.append(batch)
+        records_sent += length
+    
+    return batches
+ 
+class MyServer(BaseHTTPRequestHandler):
+    def do_GET(self):
+        chunk_size = int(2e9)
+        chunk_splits = len(buffer) // chunk_size
+        
+        self.send_response(200)
+        self.send_header('Content-Type', 'application/vnd.apache.arrow.stream')
+        
+        #######################################################################
+        # include these to enable testing JavaScript client in local browser
+        self.send_header('Access-Control-Allow-Origin', 'http://localhost:8080')
+        self.send_header('Access-Control-Allow-Methods', 'GET')
+        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
+        #######################################################################
+        
+        self.end_headers()
+        
+        for i in range(chunk_splits):
+            self.wfile.write(buffer.slice(i * chunk_size, chunk_size))
+            self.wfile.flush()
+        self.wfile.write(buffer.slice(chunk_splits * chunk_size))
+        self.wfile.flush()
+
+batches = GetPutData()
+
+sink = pa.BufferOutputStream()
+
+with pa.ipc.new_stream(sink, schema) as writer:
+   for i in range(len(batches)):
+      writer.write_batch(batches[i])
+
+buffer = sink.getvalue()

Review Comment:
   This example writes _all_ the record batches to an IPC stream, then finalizes the stream and returns _all_ the bytes as a PyArrow buffer. Then the HTTP server writes the bytes from the buffer to the output stream. This requires that the whole data fit in memory.
   
   Is it possible to do this process incrementally (e.g. one record batch at a time)?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on PR #39081:
URL: https://github.com/apache/arrow/pull/39081#issuecomment-1977300419

   Migrated to https://github.com/apache/arrow-experiments/pull/1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1448145304


##########
http/examples/get/client.cpp:
##########


Review Comment:
   Hmm, I'm able to add this at the very bottom of `main` in your first example,  and it works without error when running a build of arrow after #39164 merged:
   ```cpp
     std::vector<std::shared_ptr<arrow::RecordBatch>> record_batches;
     record_batches = collect_listener->record_batches();
     std::cout << "Schema: " << std::endl << record_batches[0]->schema()->ToString() << std::endl;
     std::cout << "RecordBatch: " << std::endl << record_batches[0]->ToString() << std::endl;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1448145304


##########
http/examples/get/client.cpp:
##########


Review Comment:
   Hmm, I'm able to add this at the very bottom of `main`  and it works without error:
   ```cpp
     std::vector<std::shared_ptr<arrow::RecordBatch>> record_batches;
     record_batches = collect_listener->record_batches();
     std::cout << "Schema: " << std::endl << record_batches[0]->schema()->ToString() << std::endl;
     std::cout << "RecordBatch: " << std::endl << record_batches[0]->ToString() << std::endl;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "paleolimbot (via GitHub)" <gi...@apache.org>.
paleolimbot commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416479490


##########
http/examples/get/server.py:
##########
@@ -0,0 +1,75 @@
+import pyarrow as pa
+from random import randbytes
+from http.server import BaseHTTPRequestHandler, HTTPServer
+
+schema = pa.schema([
+    ('a', pa.int64()),
+    ('b', pa.int64()),
+    ('c', pa.int64()),
+    ('d', pa.int64())
+])
+
+def GetPutData():
+    total_records = 100000000
+    length = 4096
+    ncolumns = 4
+    
+    arrays = []
+    
+    for x in range(0, ncolumns):
+        buffer = pa.py_buffer(randbytes(length * 8))
+        arrays.append(pa.Int64Array.from_buffers(pa.int64(), length, [None, buffer], null_count=0))
+    
+    batch = pa.record_batch(arrays, schema)
+    batches = []
+    
+    records_sent = 0
+    while records_sent < total_records:
+      if records_sent + length > total_records:
+        last_length = total_records - records_sent
+        batches.append(batch.slice(0, last_length))
+        records_sent += last_length
+      else:
+        batches.append(batch)
+        records_sent += length
+    
+    return batches
+ 
+class MyServer(BaseHTTPRequestHandler):
+    def do_GET(self):
+        chunk_size = int(2e9)
+        chunk_splits = len(buffer) // chunk_size
+        
+        self.send_response(200)
+        self.send_header('Content-Type', 'application/vnd.apache.arrow.stream')
+        
+        #######################################################################
+        # include these to enable testing JavaScript client in local browser
+        self.send_header('Access-Control-Allow-Origin', 'http://localhost:8080')
+        self.send_header('Access-Control-Allow-Methods', 'GET')
+        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
+        #######################################################################
+        
+        self.end_headers()
+        
+        for i in range(chunk_splits):
+            self.wfile.write(buffer.slice(i * chunk_size, chunk_size))
+            self.wfile.flush()
+        self.wfile.write(buffer.slice(chunk_splits * chunk_size))
+        self.wfile.flush()
+
+batches = GetPutData()
+
+sink = pa.BufferOutputStream()
+
+with pa.ipc.new_stream(sink, schema) as writer:
+   for i in range(len(batches)):
+      writer.write_batch(batches[i])
+
+buffer = sink.getvalue()

Review Comment:
   Does my `BytesIO` trick work here? https://github.com/paleolimbot/2023-11-21_arrow-over-http-scratchpad/blob/main/app.py#L17-L30



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1417458135


##########
http/examples/get/server.py:
##########
@@ -0,0 +1,75 @@
+import pyarrow as pa
+from random import randbytes
+from http.server import BaseHTTPRequestHandler, HTTPServer
+
+schema = pa.schema([
+    ('a', pa.int64()),
+    ('b', pa.int64()),
+    ('c', pa.int64()),
+    ('d', pa.int64())
+])
+
+def GetPutData():
+    total_records = 100000000
+    length = 4096
+    ncolumns = 4
+    
+    arrays = []
+    
+    for x in range(0, ncolumns):
+        buffer = pa.py_buffer(randbytes(length * 8))
+        arrays.append(pa.Int64Array.from_buffers(pa.int64(), length, [None, buffer], null_count=0))
+    
+    batch = pa.record_batch(arrays, schema)
+    batches = []
+    
+    records_sent = 0
+    while records_sent < total_records:
+      if records_sent + length > total_records:
+        last_length = total_records - records_sent
+        batches.append(batch.slice(0, last_length))
+        records_sent += last_length
+      else:
+        batches.append(batch)
+        records_sent += length
+    
+    return batches
+ 
+class MyServer(BaseHTTPRequestHandler):
+    def do_GET(self):
+        chunk_size = int(2e9)
+        chunk_splits = len(buffer) // chunk_size
+        
+        self.send_response(200)
+        self.send_header('Content-Type', 'application/vnd.apache.arrow.stream')
+        
+        #######################################################################
+        # include these to enable testing JavaScript client in local browser
+        self.send_header('Access-Control-Allow-Origin', 'http://localhost:8080')
+        self.send_header('Access-Control-Allow-Methods', 'GET')
+        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
+        #######################################################################
+        
+        self.end_headers()
+        
+        for i in range(chunk_splits):
+            self.wfile.write(buffer.slice(i * chunk_size, chunk_size))
+            self.wfile.flush()
+        self.wfile.write(buffer.slice(chunk_splits * chunk_size))
+        self.wfile.flush()
+
+batches = GetPutData()
+
+sink = pa.BufferOutputStream()
+
+with pa.ipc.new_stream(sink, schema) as writer:
+   for i in range(len(batches)):
+      writer.write_batch(batches[i])
+
+buffer = sink.getvalue()

Review Comment:
   P.S. Those memory numbers do not include memory allocated by the Arrow C++ layer for the record batches, which in this example is about 3.2 GB because they are stored in memory.
   ```py
   sum([_.get_total_buffer_size() for _ in batches])
   # 3200122880
   ```
   I'll try another example where I stream them in from an IPC file on disk to keep the overall memory footprint very small.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "paleolimbot (via GitHub)" <gi...@apache.org>.
paleolimbot commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416483605


##########
http/examples/get/server.py:
##########
@@ -0,0 +1,75 @@
+import pyarrow as pa
+from random import randbytes
+from http.server import BaseHTTPRequestHandler, HTTPServer
+
+schema = pa.schema([
+    ('a', pa.int64()),
+    ('b', pa.int64()),
+    ('c', pa.int64()),
+    ('d', pa.int64())
+])
+
+def GetPutData():
+    total_records = 100000000
+    length = 4096
+    ncolumns = 4
+    
+    arrays = []
+    
+    for x in range(0, ncolumns):
+        buffer = pa.py_buffer(randbytes(length * 8))
+        arrays.append(pa.Int64Array.from_buffers(pa.int64(), length, [None, buffer], null_count=0))
+    
+    batch = pa.record_batch(arrays, schema)
+    batches = []
+    
+    records_sent = 0
+    while records_sent < total_records:
+      if records_sent + length > total_records:
+        last_length = total_records - records_sent
+        batches.append(batch.slice(0, last_length))
+        records_sent += last_length
+      else:
+        batches.append(batch)
+        records_sent += length
+    
+    return batches
+ 
+class MyServer(BaseHTTPRequestHandler):
+    def do_GET(self):
+        chunk_size = int(2e9)
+        chunk_splits = len(buffer) // chunk_size
+        
+        self.send_response(200)
+        self.send_header('Content-Type', 'application/vnd.apache.arrow.stream')
+        
+        #######################################################################
+        # include these to enable testing JavaScript client in local browser
+        self.send_header('Access-Control-Allow-Origin', 'http://localhost:8080')
+        self.send_header('Access-Control-Allow-Methods', 'GET')
+        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
+        #######################################################################
+        
+        self.end_headers()
+        
+        for i in range(chunk_splits):
+            self.wfile.write(buffer.slice(i * chunk_size, chunk_size))
+            self.wfile.flush()
+        self.wfile.write(buffer.slice(chunk_splits * chunk_size))
+        self.wfile.flush()

Review Comment:
   Not sure it's *quite* the same thing, but I have a rechunker in geoarrow-python that maybe also exists in pyarrow somewhere ( https://github.com/geoarrow/geoarrow-python/blob/main/geoarrow-pyarrow/src/geoarrow/pyarrow/_compute.py#L452-L461 ). Probably you need this too though (since a one row batch could still be larger than 2GB).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416239595


##########
http/examples/get/client.R:
##########
@@ -0,0 +1,24 @@
+library(httr)
+library(arrow)
+library(tictoc)
+
+url <- 'http://localhost:8000'
+
+tic()
+
+response <- GET(url)
+buffer <- content(response, "raw")
+reader <- RecordBatchStreamReader$create(buffer)
+table <- reader$read_table()
+
+# or:
+#batches <- reader$batches()
+# but this is very slow

Review Comment:
   @paleolimbot I couldn't find a fast way to convert the IPC stream into a list of record batches in R. Any ideas?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416576326


##########
http/examples/get/server.py:
##########
@@ -0,0 +1,75 @@
+import pyarrow as pa
+from random import randbytes
+from http.server import BaseHTTPRequestHandler, HTTPServer
+
+schema = pa.schema([
+    ('a', pa.int64()),
+    ('b', pa.int64()),
+    ('c', pa.int64()),
+    ('d', pa.int64())
+])
+
+def GetPutData():
+    total_records = 100000000
+    length = 4096
+    ncolumns = 4
+    
+    arrays = []
+    
+    for x in range(0, ncolumns):
+        buffer = pa.py_buffer(randbytes(length * 8))
+        arrays.append(pa.Int64Array.from_buffers(pa.int64(), length, [None, buffer], null_count=0))
+    
+    batch = pa.record_batch(arrays, schema)
+    batches = []
+    
+    records_sent = 0
+    while records_sent < total_records:
+      if records_sent + length > total_records:
+        last_length = total_records - records_sent
+        batches.append(batch.slice(0, last_length))
+        records_sent += last_length
+      else:
+        batches.append(batch)
+        records_sent += length
+    
+    return batches
+ 
+class MyServer(BaseHTTPRequestHandler):
+    def do_GET(self):
+        chunk_size = int(2e9)
+        chunk_splits = len(buffer) // chunk_size
+        
+        self.send_response(200)
+        self.send_header('Content-Type', 'application/vnd.apache.arrow.stream')
+        
+        #######################################################################
+        # include these to enable testing JavaScript client in local browser
+        self.send_header('Access-Control-Allow-Origin', 'http://localhost:8080')
+        self.send_header('Access-Control-Allow-Methods', 'GET')
+        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
+        #######################################################################
+        
+        self.end_headers()
+        
+        for i in range(chunk_splits):
+            self.wfile.write(buffer.slice(i * chunk_size, chunk_size))
+            self.wfile.flush()
+        self.wfile.write(buffer.slice(chunk_splits * chunk_size))
+        self.wfile.flush()
+
+batches = GetPutData()
+
+sink = pa.BufferOutputStream()
+
+with pa.ipc.new_stream(sink, schema) as writer:
+   for i in range(len(batches)):
+      writer.write_batch(batches[i])
+
+buffer = sink.getvalue()

Review Comment:
   wow, yes, that trick solves it:
   ```
   this example as-is:        4367028224  peak memory footprint
   with the `BytesIO` trick:    32202752  peak memory footprint
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1432960164


##########
http/examples/get/server.go:
##########
@@ -0,0 +1,98 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	"math/rand"
+	"net/http"
+
+	"github.com/apache/arrow/go/v15/arrow"
+	"github.com/apache/arrow/go/v15/arrow/array"
+	"github.com/apache/arrow/go/v15/arrow/ipc"
+	"github.com/apache/arrow/go/v15/arrow/memory"
+)
+
+var schema = arrow.NewSchema([]arrow.Field{
+	{Name: "a", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "b", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "c", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "d", Type: arrow.PrimitiveTypes.Int64},
+}, nil)
+
+func GetPutData() []arrow.Record {
+	const (
+		totalRecords = 100000000
+		length       = 4096
+		ncolumns     = 4
+		seed         = 42
+	)
+
+	var (
+		r    = rand.New(rand.NewSource(seed))
+		mem  = memory.DefaultAllocator
+		arrs = make([]arrow.Array, 0, ncolumns)
+	)
+	for i := 0; i < ncolumns; i++ {
+		buf := memory.NewResizableBuffer(mem)
+		buf.Resize(length * 8)
+		_, err := r.Read(buf.Buf())
+		if err != nil {
+			panic(err)
+		}
+		defer buf.Release()
+
+		data := array.NewData(arrow.PrimitiveTypes.Int64, length, []*memory.Buffer{nil, buf}, nil, 0, 0)
+		defer data.Release()
+		a := array.NewInt64Data(data)
+		defer a.Release()
+		arrs = append(arrs, a)
+	}
+
+	batch := array.NewRecord(schema, arrs, length)
+	defer batch.Release()
+
+	batches := make([]arrow.Record, 0)
+	records := 0
+	for records < totalRecords {
+		if records+length > totalRecords {
+			lastLen := totalRecords - records
+			batches = append(batches, batch.NewSlice(0, lastLen))
+			records += lastLen
+		} else {
+			batch.Retain()
+			batches = append(batches, batch)
+		}
+	}
+
+	return batches
+}
+
+func main() {
+	batches := GetPutData()
+
+	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
+		hdrs := w.Header()
+		hdrs.Add("access-control-allow-origin", "http://localhost:8080")
+		hdrs.Add("access-control-allow-methods", "GET")
+		hdrs.Add("access-control-allow-headers", "content-type")
+		if r.Method != http.MethodGet {
+			w.WriteHeader(http.StatusBadRequest)
+			return
+		}
+
+		hdrs.Add("content-type", "application/vnd.apache.arrow.stream")
+		w.WriteHeader(http.StatusOK)
+
+		wr := ipc.NewWriter(w, ipc.WithSchema(batches[0].Schema()))
+		defer wr.Close()
+
+		for _, b := range batches {
+			if err := wr.Write(b); err != nil {
+				panic(err)
+			}
+		}
+	})
+
+	fmt.Println("Serving on localhost:8080...")
+	log.Fatal(http.ListenAndServe(":8080", nil))

Review Comment:
   ```suggestion
   	fmt.Println("Serving on localhost:8000...")
   	log.Fatal(http.ListenAndServe(":8000", nil))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1432915275


##########
http/examples/get/client.go:
##########
@@ -0,0 +1,53 @@
+package main
+
+import (
+	"fmt"
+	"net/http"
+	"time"
+
+	"github.com/apache/arrow/go/v15/arrow"
+	"github.com/apache/arrow/go/v15/arrow/ipc"
+	"github.com/apache/arrow/go/v15/arrow/memory"
+)
+
+func main() {
+	start := time.Now()
+	resp, err := http.Get("http://localhost:8000")
+	if err != nil {
+		panic(err)
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		panic(fmt.Errorf("got non-200 status: %d", resp.StatusCode))
+	}
+	defer resp.Body.Close()
+
+	rdr, err := ipc.NewReader(resp.Body, ipc.WithAllocator(memory.DefaultAllocator))
+	if err != nil {
+		panic(err)
+	}
+	defer rdr.Release()
+
+	batches := make([]arrow.Record, 0)
+	defer func() {
+		for _, b := range batches {
+			b.Release()
+		}
+	}()
+
+	for rdr.Next() {
+		rec := rdr.Record()
+		rec.Retain()
+		batches = append(batches, rec)
+	}
+
+	if rdr.Err() != nil {
+		panic(rdr.Err())
+	}
+
+	execTime := time.Since(start)
+
+	fmt.Println(resp.ContentLength, " bytes recieved")

Review Comment:
   When I use this with the Python example server, it prints `-1 bytes received` which means unknown. I believe this is because `resp.ContentLength` just gets the `Content-Length` header, which is not set. Is there a way to directly get the number of bytes in the response body (without making extra copies)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1432945078


##########
http/examples/get/server.go:
##########
@@ -0,0 +1,98 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	"math/rand"
+	"net/http"
+
+	"github.com/apache/arrow/go/v15/arrow"
+	"github.com/apache/arrow/go/v15/arrow/array"
+	"github.com/apache/arrow/go/v15/arrow/ipc"
+	"github.com/apache/arrow/go/v15/arrow/memory"
+)
+
+var schema = arrow.NewSchema([]arrow.Field{
+	{Name: "a", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "b", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "c", Type: arrow.PrimitiveTypes.Int64},
+	{Name: "d", Type: arrow.PrimitiveTypes.Int64},
+}, nil)
+
+func GetPutData() []arrow.Record {
+	const (
+		totalRecords = 100000000
+		length       = 4096
+		ncolumns     = 4
+		seed         = 42
+	)
+
+	var (
+		r    = rand.New(rand.NewSource(seed))
+		mem  = memory.DefaultAllocator
+		arrs = make([]arrow.Array, 0, ncolumns)
+	)
+	for i := 0; i < ncolumns; i++ {
+		buf := memory.NewResizableBuffer(mem)
+		buf.Resize(length * 8)
+		_, err := r.Read(buf.Buf())
+		if err != nil {
+			panic(err)
+		}
+		defer buf.Release()
+
+		data := array.NewData(arrow.PrimitiveTypes.Int64, length, []*memory.Buffer{nil, buf}, nil, 0, 0)
+		defer data.Release()
+		a := array.NewInt64Data(data)
+		defer a.Release()
+		arrs = append(arrs, a)
+	}
+
+	batch := array.NewRecord(schema, arrs, length)
+	defer batch.Release()
+
+	batches := make([]arrow.Record, 0)
+	records := 0

Review Comment:
   ```suggestion
   	records := int64(0)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416243588


##########
http/examples/get/server.py:
##########
@@ -0,0 +1,75 @@
+import pyarrow as pa
+from random import randbytes
+from http.server import BaseHTTPRequestHandler, HTTPServer
+
+schema = pa.schema([
+    ('a', pa.int64()),
+    ('b', pa.int64()),
+    ('c', pa.int64()),
+    ('d', pa.int64())
+])
+
+def GetPutData():
+    total_records = 100000000
+    length = 4096
+    ncolumns = 4
+    
+    arrays = []
+    
+    for x in range(0, ncolumns):
+        buffer = pa.py_buffer(randbytes(length * 8))
+        arrays.append(pa.Int64Array.from_buffers(pa.int64(), length, [None, buffer], null_count=0))
+    
+    batch = pa.record_batch(arrays, schema)
+    batches = []
+    
+    records_sent = 0
+    while records_sent < total_records:
+      if records_sent + length > total_records:
+        last_length = total_records - records_sent
+        batches.append(batch.slice(0, last_length))
+        records_sent += last_length
+      else:
+        batches.append(batch)
+        records_sent += length
+    
+    return batches
+ 
+class MyServer(BaseHTTPRequestHandler):
+    def do_GET(self):
+        chunk_size = int(2e9)
+        chunk_splits = len(buffer) // chunk_size
+        
+        self.send_response(200)
+        self.send_header('Content-Type', 'application/vnd.apache.arrow.stream')
+        
+        #######################################################################
+        # include these to enable testing JavaScript client in local browser
+        self.send_header('Access-Control-Allow-Origin', 'http://localhost:8080')
+        self.send_header('Access-Control-Allow-Methods', 'GET')
+        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
+        #######################################################################
+        
+        self.end_headers()
+        
+        for i in range(chunk_splits):
+            self.wfile.write(buffer.slice(i * chunk_size, chunk_size))
+            self.wfile.flush()
+        self.wfile.write(buffer.slice(chunk_splits * chunk_size))
+        self.wfile.flush()

Review Comment:
   The slicing is needed here because when you send a buffer larger than about 2 GB to `self.wfile.write`, it errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1421773362


##########
http/examples/get/client.cpp:
##########


Review Comment:
   We can just count the number of received record batches:
   
   ```cpp
   #include <curl/curl.h>
   #include <arrow/api.h>
   #include <arrow/io/api.h>
   #include <arrow/ipc/api.h>
   #include <chrono>
   #include <iostream>
   #include <fstream>
   
   class CountListener : public arrow::ipc::Listener {
   public:
     CountListener() : num_record_batches_(0) {
     }
   
     int64_t num_record_batches() const {
       return num_record_batches_;
     }
   
     arrow::Status OnRecordBatchWithMetadataDecoded(
       arrow::RecordBatchWithMetadata record_batch_with_metadata) override {
       ++num_record_batches_;
       return arrow::Status::OK();
     }
   
   private:
     int64_t num_record_batches_;
   };
   
   static size_t
   WriteFunction(void *contents, size_t size, size_t nmemb, void *userp)
   {
     size_t real_size = size * nmemb;
     auto decoder = static_cast<arrow::ipc::StreamDecoder *>(userp);
     auto status = decoder->Consume(static_cast<const uint8_t*>(contents), real_size);
     if (status.ok()) {
       return real_size;
     } else {
       std::cerr << status.ToString() << std::endl;
       return 0;
     }
   }
   
   int main(void)
   {
     std::string url = "http://localhost:8000";
   
     CURL *curl_handle;
     CURLcode res;
   
     auto count_listener = std::make_shared<CountListener>();
     arrow::ipc::StreamDecoder decoder(count_listener);
   
     curl_global_init(CURL_GLOBAL_ALL);
     curl_handle = curl_easy_init();
   
     curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
     curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, WriteFunction);
     curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, &decoder);
    
     auto start_time = std::chrono::steady_clock::now();
   
     res = curl_easy_perform(curl_handle);
   
     printf("%ld record batches received\n", count_listener->num_record_batches());
   
     auto end_time = std::chrono::steady_clock::now();
   
     auto time_duration = std::chrono::duration_cast<std::chrono::duration<double>>(end_time - start_time);
     printf("%.2f seconds elapsed\n", time_duration.count());
   
     curl_easy_cleanup(curl_handle);
    
     curl_global_cleanup();
    
     return 0;
   }
   
   // to compile (for example):
   //clang++ client.cpp -std=c++17 $(pkg-config --cflags --libs arrow libcurl) -o client
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "paleolimbot (via GitHub)" <gi...@apache.org>.
paleolimbot commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416484582


##########
http/examples/get/client_async.py:
##########
@@ -0,0 +1,39 @@
+import asyncio
+import aiohttp

Review Comment:
   Maybe use `open_stream()` on the `response` directly? https://github.com/paleolimbot/2023-11-21_arrow-over-http-scratchpad/blob/main/client.py#L10-L14 . (Not sure exactly how that applies in the async context...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416576326


##########
http/examples/get/server.py:
##########
@@ -0,0 +1,75 @@
+import pyarrow as pa
+from random import randbytes
+from http.server import BaseHTTPRequestHandler, HTTPServer
+
+schema = pa.schema([
+    ('a', pa.int64()),
+    ('b', pa.int64()),
+    ('c', pa.int64()),
+    ('d', pa.int64())
+])
+
+def GetPutData():
+    total_records = 100000000
+    length = 4096
+    ncolumns = 4
+    
+    arrays = []
+    
+    for x in range(0, ncolumns):
+        buffer = pa.py_buffer(randbytes(length * 8))
+        arrays.append(pa.Int64Array.from_buffers(pa.int64(), length, [None, buffer], null_count=0))
+    
+    batch = pa.record_batch(arrays, schema)
+    batches = []
+    
+    records_sent = 0
+    while records_sent < total_records:
+      if records_sent + length > total_records:
+        last_length = total_records - records_sent
+        batches.append(batch.slice(0, last_length))
+        records_sent += last_length
+      else:
+        batches.append(batch)
+        records_sent += length
+    
+    return batches
+ 
+class MyServer(BaseHTTPRequestHandler):
+    def do_GET(self):
+        chunk_size = int(2e9)
+        chunk_splits = len(buffer) // chunk_size
+        
+        self.send_response(200)
+        self.send_header('Content-Type', 'application/vnd.apache.arrow.stream')
+        
+        #######################################################################
+        # include these to enable testing JavaScript client in local browser
+        self.send_header('Access-Control-Allow-Origin', 'http://localhost:8080')
+        self.send_header('Access-Control-Allow-Methods', 'GET')
+        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
+        #######################################################################
+        
+        self.end_headers()
+        
+        for i in range(chunk_splits):
+            self.wfile.write(buffer.slice(i * chunk_size, chunk_size))
+            self.wfile.flush()
+        self.wfile.write(buffer.slice(chunk_splits * chunk_size))
+        self.wfile.flush()
+
+batches = GetPutData()
+
+sink = pa.BufferOutputStream()
+
+with pa.ipc.new_stream(sink, schema) as writer:
+   for i in range(len(batches)):
+      writer.write_batch(batches[i])
+
+buffer = sink.getvalue()

Review Comment:
   wow, yes, that trick solves it:
   ```
   this example as-is:        4367028224  peak memory footprint
   with the `BytesIO` trick:    32202752  peak memory footprint
   ```
   Tested on macOS using `/usr/bin/time -l`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1416466937


##########
http/examples/get/server.py:
##########
@@ -0,0 +1,75 @@
+import pyarrow as pa
+from random import randbytes
+from http.server import BaseHTTPRequestHandler, HTTPServer

Review Comment:
   @paleolimbot has a Flask-based example here for comparison:
   https://github.com/paleolimbot/2023-11-21_arrow-over-http-scratchpad/blob/main/app.py



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1433321118


##########
http/examples/get/client.cpp:
##########


Review Comment:
   @ianmcook We need https://github.com/apache/arrow/pull/39164 with the last example...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "domoritz (via GitHub)" <gi...@apache.org>.
domoritz commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1433132585


##########
http/examples/get/client.js:
##########
@@ -0,0 +1,29 @@
+const Arrow = require('apache-arrow');
+
+const url = 'http://localhost:8000';
+
+async function getArrowData(url) {
+  try {
+    const response = await fetch(url);
+    const table = await Arrow.tableFromIPC(response);
+    return table;
+  } catch (error) {
+    console.error('Error:', error.message);
+    throw error;
+  }
+}
+
+async function runExample(url) {
+  const startTime = new Date();
+  try {
+    const table = await getArrowData(url);
+    const endTime = new Date();
+    const duration = (endTime - startTime) / 1000;
+    console.log(`${table.batches.length} record batches received`);
+    console.log(`${duration.toFixed(2)} seconds elapsed`);
+  } catch (error) {
+    console.error('Error:', error.message);
+  }
+}

Review Comment:
   I think we can use this simpler version without extra functions and variables. 



##########
http/examples/get/client.js:
##########
@@ -0,0 +1,29 @@
+const Arrow = require('apache-arrow');
+
+const url = 'http://localhost:8000';
+
+async function getArrowData(url) {
+  try {
+    const response = await fetch(url);
+    const table = await Arrow.tableFromIPC(response);
+    return table;
+  } catch (error) {
+    console.error('Error:', error.message);
+    throw error;
+  }
+}
+
+async function runExample(url) {
+  const startTime = new Date();
+  try {
+    const table = await getArrowData(url);
+    const endTime = new Date();
+    const duration = (endTime - startTime) / 1000;
+    console.log(`${table.batches.length} record batches received`);
+    console.log(`${duration.toFixed(2)} seconds elapsed`);
+  } catch (error) {
+    console.error('Error:', error.message);
+  }
+}

Review Comment:
   ```suggestion
   async function runExample(url) {
     const startTime = new Date();
     
     const table = await Arrow.tableFromIPC(fetch(url));
     
     const duration = (new Date() - startTime) / 1000;
     console.log(`${table.batches.length} record batches received`);
     console.log(`${duration.toFixed(2)} seconds elapsed`);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1433352897


##########
http/examples/get/client.java:
##########
@@ -0,0 +1,53 @@
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+import java.util.ArrayList;
+
+public class ArrowHttpClient {
+
+    public static void main(String[] args) {
+        String serverUrl = "http://localhost:8000";
+
+        try {
+            URL url = new URL(serverUrl);
+            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+            connection.setRequestMethod("GET");
+
+            if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {
+                InputStream inputStream = connection.getInputStream();
+
+                BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+                ArrowStreamReader reader = new ArrowStreamReader(inputStream, allocator);
+                List<ArrowRecordBatch> batches = new ArrayList<>();
+
+                int num_rows = 0;
+                while (reader.loadNextBatch()) { 
+                    VectorSchemaRoot root = reader.getVectorSchemaRoot();
+                    num_rows += root.getRowCount();
+                    VectorUnloader unloader = new VectorUnloader(root);
+                    ArrowRecordBatch arb = unloader.getRecordBatch();
+                    batches.add(arb);
+                }
+                
+                System.out.println(reader.bytesRead() + " bytes received");
+                System.out.println(num_rows + " records received");
+                System.out.println(batches.size() + " record batches received");
+
+                reader.close();
+            } else {
+                System.err.println("Failed with response code: " + connection.getResponseCode());
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}

Review Comment:
   To run:
   
   Create `pom.xml` with this contents:
   
   ```xml
   <?xml version="1.0" encoding="UTF-8"?>
   <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
   
       <groupId>com.example</groupId>
       <artifactId>ArrowHttpClient</artifactId>
       <version>1.0-SNAPSHOT</version>
   
       <properties>
           <arrow.version>14.0.1</arrow.version>
           <maven.compiler.source>21</maven.compiler.source>
           <maven.compiler.target>21</maven.compiler.target>
       </properties>
   
       <dependencies>
   
           <dependency>
               <groupId>org.apache.arrow</groupId>
               <artifactId>arrow-memory-core</artifactId>
               <version>${arrow.version}</version>
           </dependency>
   
           <dependency>
               <groupId>org.apache.arrow</groupId>
               <artifactId>arrow-memory-netty</artifactId>
               <version>${arrow.version}</version>
           </dependency>
   
           <dependency>
               <groupId>org.apache.arrow</groupId>
               <artifactId>arrow-vector</artifactId>
               <version>${arrow.version}</version>
           </dependency>
   
       </dependencies>
   </project>
   ```
   
   Move `client.java` to `src/main/java/com/example/ArrowHttpClient.java`
   
   Then:
   ```sh
   mvn install
   mvn compile
   _JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn exec:java -Dexec.mainClass="ArrowHttpClient"
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1448202727


##########
http/examples/get/client.cpp:
##########


Review Comment:
   Yes—I also did various other things like writing one of the received batches to an IPC file at the bottom of `main` and it all worked without any problems. I'm compiling with Clang on an Intel Mac.
   
   What happens in your environment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1448177869


##########
http/examples/get/client.cpp:
##########
@@ -0,0 +1,86 @@
+#include <curl/curl.h>
+#include <arrow/api.h>
+#include <arrow/io/api.h>
+#include <arrow/ipc/api.h>
+#include <chrono>
+
+struct MemoryStruct {
+  char *memory;
+  size_t size;
+};
+ 
+static size_t
+WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp)
+{
+  size_t realsize = size * nmemb;
+  struct MemoryStruct *mem = (struct MemoryStruct *)userp;
+ 
+  char *ptr = static_cast<char*>(realloc(mem->memory, mem->size + realsize + 1));
+  if(!ptr) {
+    printf("out of memory\n");
+    return 0;
+  }
+ 
+  mem->memory = ptr;
+  memcpy(&(mem->memory[mem->size]), contents, realsize);
+  mem->size += realsize;
+  mem->memory[mem->size] = 0;
+ 
+  return realsize;
+}
+
+int main(void)
+{
+  std::string url = "http://localhost:8000";
+
+  CURL *curl_handle;
+  CURLcode res;
+ 
+  struct MemoryStruct chunk;
+ 
+  chunk.memory = static_cast<char*>(malloc(1));
+  chunk.size = 0;
+
+  curl_global_init(CURL_GLOBAL_ALL);
+  curl_handle = curl_easy_init();
+
+  curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
+  curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, WriteMemoryCallback);
+  curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)&chunk);
+ 
+  auto start_time = std::chrono::steady_clock::now();
+
+  res = curl_easy_perform(curl_handle);
+
+  printf("%lu bytes received\n", (unsigned long)chunk.size);
+
+  auto buffer = arrow::Buffer::Wrap(reinterpret_cast<const uint8_t *>(chunk.memory), chunk.size);
+  auto input_stream = std::make_shared<arrow::io::BufferReader>(buffer);
+  auto record_batch_reader = arrow::ipc::RecordBatchStreamReader::Open(input_stream).ValueOrDie();
+
+  std::vector<std::shared_ptr<arrow::RecordBatch>> record_batches;
+  std::shared_ptr<arrow::RecordBatch> record_batch;
+  while (record_batch_reader->ReadNext(&record_batch).ok() && record_batch)
+  {
+      record_batches.push_back(record_batch);
+  }
+
+  printf("%lu record batches received\n", (unsigned long)(record_batches.size()));
+
+  auto end_time = std::chrono::steady_clock::now();
+
+  auto time_duration = std::chrono::duration_cast<std::chrono::duration<double>>(end_time - start_time);
+  printf("%.2f seconds elapsed\n", time_duration.count());
+
+  curl_easy_cleanup(curl_handle);
+ 
+  free(chunk.memory);
+
+  curl_global_cleanup();
+ 
+  return 0;
+}
+
+// to compile (for example):
+//clang++ client.cpp -std=c++17 -I/usr/local/include -L/usr/local/lib -lcurl -larrow -o client

Review Comment:
   Oops, thanks for catching this!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39081:
URL: https://github.com/apache/arrow/pull/39081#discussion_r1448024426


##########
http/examples/get/client.cpp:
##########


Review Comment:
   Note that the first example will crash when we touch collected record batches. Because the decoded record batches are only valid in  `Listener::OnRecordBatchWithMetadataDecoded()` callback.
   If we want to use decoded record batches after `curl_easy_performe()`, we need to copy the decoded record batches in `Listener::OnRecordBatchWithMetadataDecoded()`. We have an implementation for it https://github.com/apache/arrow/pull/39164/files#diff-900c46995b5706697d6e4b010f610f1a1cf27d4d865afe48de0a800830ac676bR1333 but it's included only in our test code. Because I couldn't find a good API in #39164...
   
   FYI: My recommendation is that processing each decoded record batch in `Listener::OnRecordBatchWithMetadataDecoded()` callback instead of copying it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: [DO NOT MERGE] Arrow over HTTP examples [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook closed pull request #39081: WIP: [DO NOT MERGE] Arrow over HTTP examples
URL: https://github.com/apache/arrow/pull/39081


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org