You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2023/05/08 19:06:40 UTC

[arrow-datafusion-python] branch main updated: Improve db-benchmark (#372)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a2df77  Improve db-benchmark (#372)
6a2df77 is described below

commit 6a2df777508ff335f5bad4a97e8af9da6014eaa4
Author: Andy Grove <an...@gmail.com>
AuthorDate: Mon May 8 13:06:34 2023 -0600

    Improve db-benchmark (#372)
---
 .github/workflows/test.yaml                     |   2 +-
 benchmarks/db-benchmark/README.md               |   4 +-
 benchmarks/db-benchmark/db-benchmark.dockerfile |   1 +
 benchmarks/db-benchmark/groupby-datafusion.py   | 330 ++++++++++++++++++++----
 benchmarks/db-benchmark/join-datafusion.py      | 138 +++++++---
 benchmarks/db-benchmark/run-bench.sh            |   8 +-
 dev/python_lint.sh                              |   2 +-
 7 files changed, 404 insertions(+), 81 deletions(-)

diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
index c69205b..f672c81 100644
--- a/.github/workflows/test.yaml
+++ b/.github/workflows/test.yaml
@@ -101,7 +101,7 @@ jobs:
         if: ${{ matrix.python-version == '3.10' && matrix.toolchain == 'stable' }}
         run: |
           source venv/bin/activate
-          flake8 --exclude venv --ignore=E501,W503
+          flake8 --exclude venv,benchmarks/db-benchmark --ignore=E501,W503
           black --line-length 79 --diff --check .
 
       - name: Run tests
diff --git a/benchmarks/db-benchmark/README.md b/benchmarks/db-benchmark/README.md
index 93293b0..8ce4534 100644
--- a/benchmarks/db-benchmark/README.md
+++ b/benchmarks/db-benchmark/README.md
@@ -27,6 +27,6 @@ DataFusion's Python bindings.
 Run the following from root of this project.
 
 ```bash
-$ docker build -t db-benchmark -f benchmarks/db-benchmark/db-benchmark.dockerfile .
-$ docker run --privileged -it db-benchmark
+docker build -t db-benchmark -f benchmarks/db-benchmark/db-benchmark.dockerfile .
+docker run --privileged -it db-benchmark
 ```
diff --git a/benchmarks/db-benchmark/db-benchmark.dockerfile b/benchmarks/db-benchmark/db-benchmark.dockerfile
index 2876b5b..d8842b2 100644
--- a/benchmarks/db-benchmark/db-benchmark.dockerfile
+++ b/benchmarks/db-benchmark/db-benchmark.dockerfile
@@ -108,6 +108,7 @@ RUN #./duckdb/setup-duckdb.sh
 # END OF SETUP
 
 RUN python3 -m pip install --upgrade pandas
+RUN python3 -m pip install --upgrade polars psutil
 RUN python3 -m pip install --upgrade datafusion
 
 # Now add our solution
diff --git a/benchmarks/db-benchmark/groupby-datafusion.py b/benchmarks/db-benchmark/groupby-datafusion.py
index 76dd38f..2c35259 100644
--- a/benchmarks/db-benchmark/groupby-datafusion.py
+++ b/benchmarks/db-benchmark/groupby-datafusion.py
@@ -19,14 +19,20 @@ import os
 import gc
 import timeit
 import datafusion as df
-from datafusion import functions as f
-from datafusion import col
+from datafusion import (
+    col,
+    functions as f,
+    RuntimeConfig,
+    SessionConfig,
+    SessionContext,
+)
+import pyarrow
 from pyarrow import csv as pacsv
 
 
 print("# groupby-datafusion.py", flush=True)
 
-# exec(open("./_helpers/helpers.py").read())
+exec(open("./_helpers/helpers.py").read())
 
 
 def ans_shape(batches):
@@ -40,8 +46,12 @@ def ans_shape(batches):
     return rows, cols
 
 
-# ver = df.__version__
-ver = "7.0.0"
+def execute(df):
+    print(df.execution_plan().display_indent())
+    return df.collect()
+
+
+ver = df.__version__
 git = ""
 task = "groupby"
 solution = "datafusion"
@@ -49,16 +59,47 @@ fun = ".groupby"
 cache = "TRUE"
 on_disk = "FALSE"
 
+# experimental - support running with both DataFrame and SQL APIs
+sql = True
+
 data_name = os.environ["SRC_DATANAME"]
 src_grp = os.path.join("data", data_name + ".csv")
 print("loading dataset %s" % src_grp, flush=True)
 
+schema = pyarrow.schema(
+    [
+        ("id4", pyarrow.int32()),
+        ("id5", pyarrow.int32()),
+        ("id6", pyarrow.int32()),
+        ("v1", pyarrow.int32()),
+        ("v2", pyarrow.int32()),
+        ("v3", pyarrow.float64()),
+    ]
+)
+
 data = pacsv.read_csv(
-    src_grp, convert_options=pacsv.ConvertOptions(auto_dict_encode=True)
+    src_grp,
+    convert_options=pacsv.ConvertOptions(
+        auto_dict_encode=True, column_types=schema
+    ),
 )
 print("dataset loaded")
 
-ctx = df.SessionContext()
+# create a session context with explicit runtime and config settings
+runtime = (
+    RuntimeConfig()
+    .with_disk_manager_os()
+    .with_fair_spill_pool(64 * 1024 * 1024 * 1024)
+)
+config = (
+    SessionConfig()
+    .with_repartition_joins(False)
+    .with_repartition_aggregations(False)
+    .set("datafusion.execution.coalesce_batches", "false")
+)
+ctx = SessionContext(config, runtime)
+print(ctx)
+
 ctx.register_record_batches("x", [data.to_batches()])
 print("registered record batches")
 # cols = ctx.sql("SHOW columns from x")
@@ -72,50 +113,107 @@ task_init = timeit.default_timer()
 question = "sum v1 by id1"  # q1
 gc.collect()
 t_start = timeit.default_timer()
-ans = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1").collect()
+if sql:
+    df = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1")
+else:
+    df = ctx.table("x").aggregate(
+        [f.col("id1")], [f.sum(f.col("v1")).alias("v1")]
+    )
+ans = execute(df)
+
 shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
 t = timeit.default_timer() - t_start
 print(f"q1: {t}")
-# m = memory_usage()
+m = memory_usage()
 t_start = timeit.default_timer()
 df = ctx.create_dataframe([ans])
 chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0]
 chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=in_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
 question = "sum v1 by id1:id2"  # q2
 gc.collect()
 t_start = timeit.default_timer()
-ans = ctx.sql(
-    "SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2"
-).collect()
+if sql:
+    df = ctx.sql("SELECT id1, id2, SUM(v1) AS v1 FROM x GROUP BY id1, id2")
+else:
+    df = ctx.table("x").aggregate(
+        [f.col("id1"), f.col("id2")], [f.sum(f.col("v1")).alias("v1")]
+    )
+ans = execute(df)
 shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
 t = timeit.default_timer() - t_start
 print(f"q2: {t}")
-# m = memory_usage()
+m = memory_usage()
 t_start = timeit.default_timer()
 df = ctx.create_dataframe([ans])
 chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0]
 chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=in_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
 question = "sum v1 mean v3 by id3"  # q3
 gc.collect()
 t_start = timeit.default_timer()
-ans = ctx.sql(
-    "SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3"
-).collect()
+if sql:
+    df = ctx.sql(
+        "SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3"
+    )
+else:
+    df = ctx.table("x").aggregate(
+        [f.col("id3")],
+        [
+            f.sum(f.col("v1")).alias("v1"),
+            f.avg(f.col("v3")).alias("v3"),
+        ],
+    )
+ans = execute(df)
 shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
 t = timeit.default_timer() - t_start
 print(f"q3: {t}")
-# m = memory_usage()
+m = memory_usage()
 t_start = timeit.default_timer()
 df = ctx.create_dataframe([ans])
 chk = (
@@ -125,7 +223,25 @@ chk = (
     .to_numpy()[0]
 )
 chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=in_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
@@ -136,10 +252,10 @@ ans = ctx.sql(
     "SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM x GROUP BY id4"
 ).collect()
 shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
 t = timeit.default_timer() - t_start
 print(f"q4: {t}")
-# m = memory_usage()
+m = memory_usage()
 t_start = timeit.default_timer()
 df = ctx.create_dataframe([ans])
 chk = (
@@ -149,7 +265,25 @@ chk = (
     .to_numpy()[0]
 )
 chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=in_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
@@ -160,10 +294,10 @@ ans = ctx.sql(
     "SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM x GROUP BY id6"
 ).collect()
 shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
 t = timeit.default_timer() - t_start
 print(f"q5: {t}")
-# m = memory_usage()
+m = memory_usage()
 t_start = timeit.default_timer()
 df = ctx.create_dataframe([ans])
 chk = (
@@ -173,7 +307,25 @@ chk = (
     .to_numpy()[0]
 )
 chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=in_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
@@ -184,10 +336,10 @@ ans = ctx.sql(
     "SELECT id4, id5, approx_percentile_cont(v3, .5) AS median_v3, stddev(v3) AS stddev_v3 FROM x GROUP BY id4, id5"
 ).collect()
 shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
 t = timeit.default_timer() - t_start
 print(f"q6: {t}")
-# m = memory_usage()
+m = memory_usage()
 t_start = timeit.default_timer()
 df = ctx.create_dataframe([ans])
 chk = (
@@ -197,7 +349,25 @@ chk = (
     .to_numpy()[0]
 )
 chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=in_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
@@ -208,15 +378,33 @@ ans = ctx.sql(
     "SELECT id3, MAX(v1) - MIN(v2) AS range_v1_v2 FROM x GROUP BY id3"
 ).collect()
 shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
 t = timeit.default_timer() - t_start
 print(f"q7: {t}")
-# m = memory_usage()
+m = memory_usage()
 t_start = timeit.default_timer()
 df = ctx.create_dataframe([ans])
 chk = df.aggregate([], [f.sum(col("range_v1_v2"))]).collect()[0].column(0)[0]
 chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=in_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
@@ -227,15 +415,33 @@ ans = ctx.sql(
     "SELECT id6, v3 from (SELECT id6, v3, row_number() OVER (PARTITION BY id6 ORDER BY v3 DESC) AS row FROM x) t WHERE row <= 2"
 ).collect()
 shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
 t = timeit.default_timer() - t_start
 print(f"q8: {t}")
-# m = memory_usage()
+m = memory_usage()
 t_start = timeit.default_timer()
 df = ctx.create_dataframe([ans])
 chk = df.aggregate([], [f.sum(col("v3"))]).collect()[0].column(0)[0]
 chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=in_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
@@ -244,15 +450,33 @@ gc.collect()
 t_start = timeit.default_timer()
 ans = ctx.sql("SELECT corr(v1, v2) as corr FROM x GROUP BY id2, id4").collect()
 shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
 t = timeit.default_timer() - t_start
 print(f"q9: {t}")
-# m = memory_usage()
+m = memory_usage()
 t_start = timeit.default_timer()
 df = ctx.create_dataframe([ans])
 chk = df.aggregate([], [f.sum(col("corr"))]).collect()[0].column(0)[0]
 chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=in_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
@@ -263,10 +487,10 @@ ans = ctx.sql(
     "SELECT id1, id2, id3, id4, id5, id6, SUM(v3) as v3, COUNT(*) AS cnt FROM x GROUP BY id1, id2, id3, id4, id5, id6"
 ).collect()
 shape = ans_shape(ans)
-# print(shape, flush=True)
+print(shape, flush=True)
 t = timeit.default_timer() - t_start
 print(f"q10: {t}")
-# m = memory_usage()
+m = memory_usage()
 t_start = timeit.default_timer()
 df = ctx.create_dataframe([ans])
 chk = (
@@ -276,7 +500,25 @@ chk = (
     .to_numpy()[0]
 )
 chkt = timeit.default_timer() - t_start
-# write_log(task=task, data=data_name, in_rows=in_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=in_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
diff --git a/benchmarks/db-benchmark/join-datafusion.py b/benchmarks/db-benchmark/join-datafusion.py
index 8843b55..602cee6 100755
--- a/benchmarks/db-benchmark/join-datafusion.py
+++ b/benchmarks/db-benchmark/join-datafusion.py
@@ -26,26 +26,7 @@ from pyarrow import csv as pacsv
 
 print("# join-datafusion.py", flush=True)
 
-# exec(open("./_helpers/helpers.py").read())
-
-
-def join_to_tbls(data_name):
-    x_n = int(float(data_name.split("_")[1]))
-    y_n = [
-        "{:.0e}".format(x_n / 1e6),
-        "{:.0e}".format(x_n / 1e3),
-        "{:.0e}".format(x_n),
-    ]
-    y_n = [
-        y_n[0].replace("+0", ""),
-        y_n[1].replace("+0", ""),
-        y_n[2].replace("+0", ""),
-    ]
-    return [
-        data_name.replace("NA", y_n[0]),
-        data_name.replace("NA", y_n[1]),
-        data_name.replace("NA", y_n[2]),
-    ]
+exec(open("./_helpers/helpers.py").read())
 
 
 def ans_shape(batches):
@@ -59,7 +40,7 @@ def ans_shape(batches):
     return rows, cols
 
 
-ver = "6.0.0"
+ver = df.__version__
 task = "join"
 git = ""
 solution = "datafusion"
@@ -84,13 +65,16 @@ print(
     + ", "
     + y_data_name[0]
     + ", "
-    + y_data_name[2]
+    + y_data_name[1]
     + ", "
     + y_data_name[2],
     flush=True,
 )
 
 ctx = df.SessionContext()
+print(ctx)
+
+# TODO we should be applying projections to these table reads to crete relations of different sizes
 
 x_data = pacsv.read_csv(
     src_jn_x, convert_options=pacsv.ConvertOptions(auto_dict_encode=True)
@@ -133,8 +117,26 @@ t_start = timeit.default_timer()
 df = ctx.create_dataframe([ans])
 chk = df.aggregate([], [f.sum(col("v1"))]).collect()[0].column(0)[0]
 chkt = timeit.default_timer() - t_start
-# m = memory_usage()
-# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+m = memory_usage()
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=x_data.num_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
@@ -156,8 +158,26 @@ chk = (
     .column(0)[0]
 )
 chkt = timeit.default_timer() - t_start
-# m = memory_usage()
-# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+m = memory_usage()
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=x_data.num_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
@@ -179,8 +199,26 @@ chk = (
     .column(0)[0]
 )
 chkt = timeit.default_timer() - t_start
-# m = memory_usage()
-# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+m = memory_usage()
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=x_data.num_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
@@ -202,8 +240,26 @@ chk = (
     .column(0)[0]
 )
 chkt = timeit.default_timer() - t_start
-# m = memory_usage()
-# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+m = memory_usage()
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=x_data.num_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
@@ -225,8 +281,26 @@ chk = (
     .column(0)[0]
 )
 chkt = timeit.default_timer() - t_start
-# m = memory_usage()
-# write_log(task=task, data=data_name, in_rows=x_data.num_rows, question=question, out_rows=shape[0], out_cols=shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk([chk]), chk_time_sec=chkt, on_disk=on_disk)
+m = memory_usage()
+write_log(
+    task=task,
+    data=data_name,
+    in_rows=x_data.num_rows,
+    question=question,
+    out_rows=shape[0],
+    out_cols=shape[1],
+    solution=solution,
+    version=ver,
+    git=git,
+    fun=fun,
+    run=1,
+    time_sec=t,
+    mem_gb=m,
+    cache=cache,
+    chk=make_chk([chk]),
+    chk_time_sec=chkt,
+    on_disk=on_disk,
+)
 del ans
 gc.collect()
 
diff --git a/benchmarks/db-benchmark/run-bench.sh b/benchmarks/db-benchmark/run-bench.sh
index 2c30809..36a6087 100755
--- a/benchmarks/db-benchmark/run-bench.sh
+++ b/benchmarks/db-benchmark/run-bench.sh
@@ -17,5 +17,11 @@
 # under the License.
 set -e
 
+#SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/polars/groupby-polars.py
 SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/datafusion-python/groupby-datafusion.py
-#SRC_DATANAME=J1_1e7_NA_0_0 python3 /db-benchmark/datafusion-python/join-datafusion.py
+
+# joins need more work still
+#SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/datafusion-python/join-datafusion.py
+#SRC_DATANAME=G1_1e7_1e2_0_0 python3 /db-benchmark/polars/join-polars.py
+
+cat time.csv
diff --git a/dev/python_lint.sh b/dev/python_lint.sh
index 9493462..3bc67fb 100755
--- a/dev/python_lint.sh
+++ b/dev/python_lint.sh
@@ -22,5 +22,5 @@
 
 set -e
 source venv/bin/activate
-flake8 --exclude venv --ignore=E501,W503
+flake8 --exclude venv,benchmarks/db-benchmark --ignore=E501,W503
 black --line-length 79 .