You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2019/06/06 20:39:10 UTC

[incubator-iceberg] branch master updated: Adding Bin Packing util for Combining scan tasks (#208)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0daa242  Adding Bin Packing util for Combining scan tasks (#208)
0daa242 is described below

commit 0daa242746d6d166905362104a44cd426afcda0d
Author: TGooch44 <te...@gmail.com>
AuthorDate: Thu Jun 6 13:39:05 2019 -0700

    Adding Bin Packing util for Combining scan tasks (#208)
---
 python/iceberg/core/util/bin_packing.py     | 77 +++++++++++++++++++++++++++++
 python/tests/core/utils/__init__.py         |  0
 python/tests/core/utils/test_bin_packing.py | 36 ++++++++++++++
 3 files changed, 113 insertions(+)

diff --git a/python/iceberg/core/util/bin_packing.py b/python/iceberg/core/util/bin_packing.py
new file mode 100644
index 0000000..d7a3105
--- /dev/null
+++ b/python/iceberg/core/util/bin_packing.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from copy import deepcopy
+
+class PackingIterator(object):
+
+    def __init__(self, items, target_weight, lookback, weight_func):
+        self.items = deepcopy(items)
+        self.target_weight = target_weight
+        self.lookback = lookback
+        self.weight_func = weight_func
+        self.bins = list()
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        item_copy = list(self.items)
+        i = 0
+        for item in item_copy:
+            weight = self.weight_func(item)
+            curr_bin = PackingIterator.find(self.bins, weight)
+            if curr_bin is not None:
+                curr_bin.add(item, weight)
+                self.items.pop(i)
+                i -= 1
+            else:
+                curr_bin = Bin(self.target_weight)
+                self.items.pop(i)
+                i -= 1
+                curr_bin.add(item, weight)
+                self.bins.append(curr_bin)
+
+                if len(self.bins) > self.lookback:
+                    return list(self.bins.pop(0).items)
+            i += 1
+
+        if len(self.bins) == 0:
+            raise StopIteration()
+
+        return list(self.bins.pop(0).items)
+
+    @staticmethod
+    def find(bins, weight):
+        for curr_bin in bins:
+            if curr_bin.can_add(weight):
+                return curr_bin
+
+
+class Bin(object):
+
+    def __init__(self, target_weight):
+        self.bin_weight = 0
+        self.target_weight = target_weight
+        self.items = list()
+
+    def can_add(self, weight):
+        return self.bin_weight + weight <= self.target_weight
+
+    def add(self, item, weight):
+        self.bin_weight += weight
+        self.items.append(item)
diff --git a/python/tests/core/utils/__init__.py b/python/tests/core/utils/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/python/tests/core/utils/test_bin_packing.py b/python/tests/core/utils/test_bin_packing.py
new file mode 100644
index 0000000..a35386e
--- /dev/null
+++ b/python/tests/core/utils/test_bin_packing.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import random
+
+from iceberg.core.util import PackingIterator
+import pytest
+
+
+@pytest.mark.parametrize("splits,lookback,split_size, open_cost", [
+    ([random.randint(0, 64) for x in range(200)], 20, 128, 4),  # random splits
+    ([], 20, 128, 4),  # no splits
+    ([0]*100 + [random.randint(0, 64) for x in range(10)] + [0]*100, 20, 128, 4)  # sparse
+])
+def test_bin_packing(splits, lookback, split_size, open_cost):
+
+    def weight_func(x):
+        return max(x, open_cost)
+
+    item_list_sums = [sum(item)
+                      for item in PackingIterator(splits, split_size, lookback, weight_func)]
+    assert all([split_size >= item_sum >= 0 for item_sum in item_list_sums])