You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/01/26 08:58:39 UTC

[GitHub] [iceberg] jackye1995 commented on a change in pull request #3983: Spark: Spark3 ZOrder Rewrite Strategy

jackye1995 commented on a change in pull request #3983:
URL: https://github.com/apache/iceberg/pull/3983#discussion_r792400525



##########
File path: core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iceberg.util;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Random;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.iceberg.relocated.com.google.common.primitives.UnsignedBytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestZOrderByteUtil {
+  private static final byte IIIIIIII = (byte) 255;
+  private static final byte IOIOIOIO = (byte) 170;
+  private static final byte OIOIOIOI = (byte) 85;
+  private static final byte OOOOIIII = (byte) 15;
+  private static final byte OOOOOOOI = (byte) 1;
+  private static final byte OOOOOOOO = (byte) 0;
+
+  private static final int NUM_TESTS = 100000;
+
+  private final Random random = new Random(42);
+
+  private String bytesToString(byte[] bytes) {
+    StringBuilder result = new StringBuilder();
+    for (byte b : bytes) {
+      result.append(String.format("%8s", Integer.toBinaryString(b & 0xFF)).replace(' ', '0'));
+    }
+    return result.toString();
+  }
+
+  /**
+   * Returns a non-0 length byte array
+   */
+  private byte[]  generateRandomBytes() {
+    int length = Math.abs(random.nextInt(100) + 1);
+    byte[] result = new byte[length];
+    random.nextBytes(result);
+    return result;
+  }
+
+  /**
+   * Test method to ensure correctness of byte interleaving code
+   */
+  private String interleaveStrings(String[] strings) {
+    StringBuilder result = new StringBuilder();
+    int totalLength = Arrays.stream(strings).mapToInt(String::length).sum();
+    int substringIndex = 0;
+    int characterIndex = 0;
+    while (characterIndex < totalLength) {
+      for (String str : strings) {
+        if (substringIndex < str.length()) {
+          result.append(str.charAt(substringIndex));
+          characterIndex++;
+        }
+      }
+      substringIndex++;
+    }
+    return result.toString();
+  }
+
+  /**
+   * Compares the result of a string based interleaving algorithm implemented above
+   * versus the binary bit-shifting algorithm used in ZOrderByteUtils. Either both
+   * algorithms are identically wrong or are both identically correct.
+   */
+  @Test
+  public void testInterleaveRandomExamples() {
+    for (int test = 0; test < NUM_TESTS; test++) {
+      int numByteArrays = Math.abs(random.nextInt(6)) + 1;
+      byte[][] testBytes =  new byte[numByteArrays][];
+      String[] testStrings = new String[numByteArrays];
+      for (int byteIndex = 0;  byteIndex < numByteArrays; byteIndex++) {
+        testBytes[byteIndex] = generateRandomBytes();
+        testStrings[byteIndex] = bytesToString(testBytes[byteIndex]);
+      }
+      byte[] byteResult = ZOrderByteUtils.interleaveBits(testBytes);
+      String byteResultAsString = bytesToString(byteResult);
+
+      String stringResult = interleaveStrings(testStrings);
+
+      Assert.assertEquals("String interleave didn't match byte interleave", stringResult, byteResultAsString);
+    }
+  }
+
+  @Test
+  public void testInterleaveEmptyBits() {
+    byte[][] test = new byte[4][10];
+    byte[] expected = new byte[40];
+
+    Assert.assertArrayEquals("Should combine empty arrays",
+        expected, ZOrderByteUtils.interleaveBits(test));
+  }
+
+  @Test
+  public void testInterleaveFullBits() {
+    byte[][] test = new byte[4][];
+    test[0] = new byte[]{IIIIIIII, IIIIIIII};
+    test[1] = new byte[]{IIIIIIII};
+    test[2] = new byte[0];
+    test[3] = new byte[]{IIIIIIII, IIIIIIII, IIIIIIII};
+    byte[] expected = new byte[]{IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII};
+
+    Assert.assertArrayEquals("Should combine full arrays",
+        expected, ZOrderByteUtils.interleaveBits(test));
+  }
+
+  @Test
+  public void testInterleaveMixedBits() {
+    byte[][] test = new byte[4][];
+    test[0] = new byte[]{OOOOOOOI, IIIIIIII, OOOOOOOO, OOOOIIII};
+    test[1] = new byte[]{OOOOOOOI, OOOOOOOO, IIIIIIII};
+    test[2] = new byte[]{OOOOOOOI};
+    test[3] = new byte[]{OOOOOOOI};
+    byte[] expected = new byte[]{
+        OOOOOOOO, OOOOOOOO, OOOOOOOO, OOOOIIII,
+        IOIOIOIO, IOIOIOIO,
+        OIOIOIOI, OIOIOIOI,
+        OOOOIIII};
+    Assert.assertArrayEquals("Should combine mixed byte arrays",
+        expected, ZOrderByteUtils.interleaveBits(test));
+  }
+
+  @Test
+  public void testIntOrdering() {
+    for (int i = 0; i < NUM_TESTS; i++) {
+      int aInt = random.nextInt();
+      int bInt = random.nextInt();
+      int intCompare = Integer.compare(aInt, bInt);
+      byte[] aBytes = ZOrderByteUtils.orderIntLikeBytes(bytesOf(aInt), 4);
+      byte[] bBytes = ZOrderByteUtils.orderIntLikeBytes(bytesOf(bInt), 4);
+      int byteCompare = UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes);
+
+      Assert.assertTrue(String.format(
+          "Ordering of ints should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ",
+          aInt, bInt, intCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare),
+          (intCompare ^ byteCompare) >= 0);
+    }
+  }
+
+  @Test
+  public void testLongOrdering() {
+    for (int i = 0; i < NUM_TESTS; i++) {
+      long aLong = random.nextInt();
+      long bLong = random.nextInt();
+      int longCompare = Long.compare(aLong, bLong);
+      byte[] aBytes = ZOrderByteUtils.orderIntLikeBytes(bytesOf(aLong), 8);
+      byte[] bBytes = ZOrderByteUtils.orderIntLikeBytes(bytesOf(bLong), 8);
+      int byteCompare = UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes);
+
+      Assert.assertTrue(String.format(
+          "Ordering of ints should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ",

Review comment:
       nit: ordering of longs, same issue for all other data type tests

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -113,6 +113,15 @@ default RewriteDataFiles sort(SortOrder sortOrder) {
     throw new UnsupportedOperationException("SORT Rewrite Strategy not implemented for this framework");
   }
 
+  /**
+   * Choose Z-ORDER as a strategy for this rewrite operation with a specified list of columns to use
+   * @param columns Columns to be used to generate Z-Values
+   * @return this for method chaining
+   */
+  default RewriteDataFiles zOrder(String... columns) {

Review comment:
       I remember in the original design zOrder is considered a part of the sort order, so it makes more sense to me for the input here to be `SortOrder` instead of `String... columns`. I guess this will be a shortcut for something like `SortOrder.builderFor(schema).zOrder().columns(a,b,c).bitWidth(10)` when that interface is out? But how do people define things like string column width to cutoff in this case?

##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderStrategy.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.actions;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.util.SortOrderUtil;
+import org.apache.iceberg.util.ZOrderByteUtils;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.connector.distributions.Distribution;
+import org.apache.spark.sql.connector.distributions.Distributions;
+import org.apache.spark.sql.connector.expressions.SortOrder;
+import org.apache.spark.sql.expressions.UserDefinedFunction;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.TimestampType;
+import scala.collection.Seq;
+
+public class Spark3ZOrderStrategy extends Spark3SortStrategy {
+  private static final String Z_COLUMN = "ICEZVALUE";
+  private static final Schema Z_SCHEMA = new Schema(NestedField.required(0, Z_COLUMN, Types.BinaryType.get()));
+  private static final org.apache.iceberg.SortOrder Z_SORT_ORDER = org.apache.iceberg.SortOrder.builderFor(Z_SCHEMA)
+      .sortBy(Z_COLUMN, SortDirection.ASC, NullOrder.NULLS_LAST)
+      .build();
+  private static final int STRING_KEY_LENGTH = 60;

Review comment:
       I see, going back to the top question, this is currently static. Can we at least make this configurable somehow, maybe as a part of the interface input?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org