You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2023/05/08 19:06:40 UTC
[arrow-datafusion-python] branch main updated: Improve db-benchmark (#372)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new 6a2df77 Improve db-benchmark (#372)
6a2df77 is described below
commit 6a2df777508ff335f5bad4a97e8af9da6014eaa4
Author: Andy Grove <an...@gmail.com>
AuthorDate: Mon May 8 13:06:34 2023 -0600
Improve db-benchmark (#372)
---
.github/workflows/test.yaml | 2 +-
benchmarks/db-benchmark/README.md | 4 +-
benchmarks/db-benchmark/db-benchmark.dockerfile | 1 +
benchmarks/db-benchmark/groupby-datafusion.py | 330 ++++++++++++++++++++----
benchmarks/db-benchmark/join-datafusion.py | 138 +++++++---
benchmarks/db-benchmark/run-bench.sh | 8 +-
dev/python_lint.sh | 2 +-
7 files changed, 404 insertions(+), 81 deletions(-)
diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
index c69205b..f672c81 100644
--- a/.github/workflows/test.yaml
+++ b/.github/workflows/test.yaml
@@ -101,7 +101,7 @@ jobs:
if: ${{ matrix.python-version == '3.10' && matrix.toolchain == 'stable' }}
run: |
source venv/bin/activate
- flake8 --exclude venv --ignore=E501,W503
+ flake8 --exclude venv,benchmarks/db-benchmark --ignore=E501,W503
black --line-length 79 --diff --check .
- name: Run tests
diff --git a/benchmarks/db-benchmark/README.md b/benchmarks/db-benchmark/README.md
index 93293b0..8ce4534 100644
--- a/benchmarks/db-benchmark/README.md
+++ b/benchmarks/db-benchmark/README.md
@@ -27,6 +27,6 @@ DataFusion's Python bindings.
Run the following from root of this project.
```bash
-$ docker build -t db-benchmark -f benchmarks/db-benchmark/db-benchmark.dockerfile .
-$ docker run --privileged -it db-benchmark
+docker build -t db-benchmark -f benchmarks/db-benchmark/db-benchmark.dockerfile .
+docker run --privileged -it db-benchmark
```
diff --git a/benchmarks/db-benchmark/db-benchmark.dockerfile b/benchmarks/db-benchmark/db-benchmark.dockerfile
index 2876b5b..d8842b2 100644
--- a/benchmarks/db-benchmark/db-benchmark.dockerfile
+++ b/benchmarks/db-benchmark/db-benchmark.dockerfile
@@ -108,6 +108,7 @@ RUN #./duckdb/setup-duckdb.sh
# END OF SETUP
RUN python3 -m pip install --upgrade pandas
+RUN python3 -m pip install --upgrade polars psutil
RUN python3 -m pip install --upgrade datafusion
# Now add our solution
diff --git a/benchmarks/db-benchmark/groupby-datafusion.py b/benchmarks/db-benchmark/groupby-datafusion.py
index 76dd38f..2c35259 100644
--- a/benchmarks/db-benchmark/groupby-datafusion.py
+++ b/benchmarks/db-benchmark/groupby-datafusion.py
@@ -19,14 +19,20 @@ import os
import gc
import timeit
import datafusion as df
-from datafusion import functions as f
-from datafusion import col
+from datafusion import (
+ col,
+ functions as f,
+ RuntimeConfig,
+ SessionConfig,
+ SessionContext,
+)
+import pyarrow
from pyarrow import csv as pacsv
print("# groupby-datafusion.py", flush=True)
-# exec(open("./_helpers/helpers.py").read())
+exec(open("./_helpers/helpers.py").read())
def ans_shape(batches):
@@ -40,8 +46,12 @@ def ans_shape(batches):
return rows, cols
-# ver = df.__version__
-ver = "7.0.0"
+def execute(df):
+ print(df.execution_plan().display_indent())
+ return df.collect()
+
+
+ver = df.__version__
git = ""
task = "groupby"
solution = "datafusion"
@@ -49,16 +59,47 @@ fun = ".groupby"
cache = "TRUE"
on_disk = "FALSE"
+# experimental - support running with both DataFrame and SQL APIs
+sql = True
+
data_name = os.environ["SRC_DATANAME"]
src_grp = os.path.join("data", data_name + ".csv")
print("loading dataset %s" % src_grp, flush=True)
+schema = pyarrow.schema(
+ [
+ ("id4", pyarrow.int32()),
+ ("id5", pyarrow.int32()),
+ ("id6", pyarrow.int32()),
+ ("v1", pyarrow.int32()),
+ ("v2", pyarrow.int32()),
+ ("v3", pyarrow.float64()),
+ ]
+)
+
data = pacsv.read_csv(
- src_grp, convert_options=pacsv.ConvertOptions(auto_dict_encode=True)
+ src_grp,
+ convert_options=pacsv.ConvertOptions(
+ auto_dict_encode=True, column_types=schema
+ ),
)
print("dataset loaded")
-ctx = df.SessionContext()
+# create a session context with explicit runtime and config settings
+runtime = (
+ RuntimeConfig()
+ .with_disk_manager_os()
+ .with_fair_spill_pool(64 * 1024 * 1024 * 1024)
+)
+config = (
+ SessionConfig()
+ .with_repartition_joins(False)
+ .with_repartition_aggregations(False)
+ .set("datafusion.execution.coalesce_batches", "false")
+)
+ctx = SessionContext(config, runtime)
+print(ctx)
+
ctx.register_record_batches("x", [data.to_batches()])
print("registered record batches")
# cols = ctx.sql("SHOW columns from x")
@@ -72,50 +113,107 @@ task_init = timeit.default_timer()
question = "sum v1 by id1" # q1
gc.collect()
t_start = timeit.default_timer()
-ans = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1").collect()
+if sql:
+ df = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1")
+else:
+ df = ctx.table("x").aggregate(
+ [f.col("id1")], [f.sum(f.col("v1")).alias("v1")]
+ )
+ans = execute(df)
+
shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q1: {t}")
-# m = memory_usage()
+m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=in_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
question = "sum v1 by id1:id2" # q2
gc.collect()
t_start = timeit.default_timer()
-ans = ctx.sql(
- "SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2"
-).collect()
+if sql:
+ df = ctx.sql("SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2")
+else:
+ df = ctx.table("x").aggregate(
+ [f.col("id1"), f.col("id2")], [f.sum(f.col("v1")).alias("v1")]
+ )
+ans = execute(df)
shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q2: {t}")
-# m = memory_usage()
+m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=in_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
question = "sum v1 mean v3 by id3" # q3
gc.collect()
t_start = timeit.default_timer()
-ans = ctx.sql(
- "SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3"
-).collect()
+if sql:
+ df = ctx.sql(
+ "SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3"
+ )
+else:
+ df = ctx.table("x").aggregate(
+ [f.col("id3")],
+ [
+ f.sum(f.col("v1")).alias("v1"),
+ f.avg(f.col("v3")).alias("v3"),
+ ],
+ )
+ans = execute(df)
shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q3: {t}")
-# m = memory_usage()
+m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = (
@@ -125,7 +223,25 @@ chk = (
.to_numpy()[0]
)
chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=in_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
@@ -136,10 +252,10 @@ ans = ctx.sql(
"SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4"
).collect()
shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q4: {t}")
-# m = memory_usage()
+m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = (
@@ -149,7 +265,25 @@ chk = (
.to_numpy()[0]
)
chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=in_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
@@ -160,10 +294,10 @@ ans = ctx.sql(
"SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6"
).collect()
shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q5: {t}")
-# m = memory_usage()
+m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = (
@@ -173,7 +307,25 @@ chk = (
.to_numpy()[0]
)
chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=in_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
@@ -184,10 +336,10 @@ ans = ctx.sql(
"SELECT id4, id5, approx_percentile_cont(v3, .5) AS median_v3, stddev(v3) AS stddev_v3 FROM x GROUP BY id4, id5"
).collect()
shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q6: {t}")
-# m = memory_usage()
+m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = (
@@ -197,7 +349,25 @@ chk = (
.to_numpy()[0]
)
chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=in_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
@@ -208,15 +378,33 @@ ans = ctx.sql(
"SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3"
).collect()
shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q7: {t}")
-# m = memory_usage()
+m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("range_v1_v2"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=in_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
@@ -227,15 +415,33 @@ ans = ctx.sql(
"SELECT id6, v3 from (SELECT id6, v3, row_number() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS row FROM x) t WHERE row <= 2"
).collect()
shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q8: {t}")
-# m = memory_usage()
+m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("v3"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=in_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
@@ -244,15 +450,33 @@ gc.collect()
t_start = timeit.default_timer()
ans = ctx.sql("SELECT corr(v1, v2) as corr FROM x GROUP BY id2, id4").collect()
shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q9: {t}")
-# m = memory_usage()
+m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("corr"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=in_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
@@ -263,10 +487,10 @@ ans = ctx.sql(
"SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM x GROUP BY id1, id2, id3, id4, id5, id6"
).collect()
shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
t = timeit.default_timer() - t_start
print(f"q10: {t}")
-# m = memory_usage()
+m = memory_usage()
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = (
@@ -276,7 +500,25 @@ chk = (
.to_numpy()[0]
)
chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=in_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
diff --git a/benchmarks/db-benchmark/join-datafusion.py b/benchmarks/db-benchmark/join-datafusion.py
index 8843b55..602cee6 100755
--- a/benchmarks/db-benchmark/join-datafusion.py
+++ b/benchmarks/db-benchmark/join-datafusion.py
@@ -26,26 +26,7 @@ from pyarrow import csv as pacsv
print("# join-datafusion.py", flush=True)
-# exec(open("./_helpers/helpers.py").read())
-
-
-def join_to_tbls(data_name):
- x_n = int(float(data_name.split("_")[1]))
- y_n = [
- "{:.0e}".format(x_n / 1e6),
- "{:.0e}".format(x_n / 1e3),
- "{:.0e}".format(x_n),
- ]
- y_n = [
- y_n[0].replace("+0", ""),
- y_n[1].replace("+0", ""),
- y_n[2].replace("+0", ""),
- ]
- return [
- data_name.replace("NA", y_n[0]),
- data_name.replace("NA", y_n[1]),
- data_name.replace("NA", y_n[2]),
- ]
+exec(open("./_helpers/helpers.py").read())
def ans_shape(batches):
@@ -59,7 +40,7 @@ def ans_shape(batches):
return rows, cols
-ver = "6.0.0"
+ver = df.__version__
task = "join"
git = ""
solution = "datafusion"
@@ -84,13 +65,16 @@ print(
+ ", "
+ y_data_name[0]
+ ", "
- + y_data_name[2]
+ + y_data_name[1]
+ ", "
+ y_data_name[2],
flush=True,
)
ctx = df.SessionContext()
+print(ctx)
+
+# TODO we should be applying projections to these table reads to crete relations of different sizes
x_data = pacsv.read_csv(
src_jn_x, convert_options=pacsv.ConvertOptions(auto_dict_encode=True)
@@ -133,8 +117,26 @@ t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
-# m = memory_usage()
-# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+m = memory_usage()
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=x_data.num_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
@@ -156,8 +158,26 @@ chk = (
.column(0)[0]
)
chkt = timeit.default_timer() - t_start
-# m = memory_usage()
-# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+m = memory_usage()
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=x_data.num_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
@@ -179,8 +199,26 @@ chk = (
.column(0)[0]
)
chkt = timeit.default_timer() - t_start
-# m = memory_usage()
-# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+m = memory_usage()
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=x_data.num_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
@@ -202,8 +240,26 @@ chk = (
.column(0)[0]
)
chkt = timeit.default_timer() - t_start
-# m = memory_usage()
-# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+m = memory_usage()
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=x_data.num_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
@@ -225,8 +281,26 @@ chk = (
.column(0)[0]
)
chkt = timeit.default_timer() - t_start
-# m = memory_usage()
-# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+m = memory_usage()
+write_log(
+ task=task,
+ data=data_name,
+ in_rows=x_data.num_rows,
+ question=question,
+ out_rows=shape[0],
+ out_cols=shape[1],
+ solution=solution,
+ version=ver,
+ git=git,
+ fun=fun,
+ run=1,
+ time_sec=t,
+ mem_gb=m,
+ cache=cache,
+ chk=make_chk([chk]),
+ chk_time_sec=chkt,
+ on_disk=on_disk,
+)
del ans
gc.collect()
diff --git a/benchmarks/db-benchmark/run-bench.sh b/benchmarks/db-benchmark/run-bench.sh
index 2c30809..36a6087 100755
--- a/benchmarks/db-benchmark/run-bench.sh
+++ b/benchmarks/db-benchmark/run-bench.sh
@@ -17,5 +17,11 @@
# under the License.
set -e
+#SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/polars/groupby-polars.py
SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/datafusion-python/groupby-datafusion.py
-#SRC_DATANAME=J1_1e7_NA_0_0 python3 /db-benchmark/datafusion-python/join-datafusion.py
+
+# joins need more work still
+#SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/datafusion-python/join-datafusion.py
+#SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/polars/join-polars.py
+
+cat time.csv
diff --git a/dev/python_lint.sh b/dev/python_lint.sh
index 9493462..3bc67fb 100755
--- a/dev/python_lint.sh
+++ b/dev/python_lint.sh
@@ -22,5 +22,5 @@
set -e
source venv/bin/activate
-flake8 --exclude venv --ignore=E501,W503
+flake8 --exclude venv,benchmarks/db-benchmark --ignore=E501,W503
black --line-length 79 .