You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/04/07 04:01:44 UTC

[iceberg] branch master updated: Spark: Show sort order as a table property (#2421)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0df5de7  Spark: Show sort order as a table property (#2421)
0df5de7 is described below

commit 0df5de711e773b5143f57f62dd52dc40b1b2f8bd
Author: Yufei Gu <yu...@apache.org>
AuthorDate: Tue Apr 6 21:01:30 2021 -0700

    Spark: Show sort order as a table property (#2421)
---
 .../java/org/apache/iceberg/spark/Spark3Util.java  | 60 ++++++++++++++++
 .../apache/iceberg/spark/source/SparkTable.java    |  6 +-
 .../apache/iceberg/actions/TestCreateActions.java  |  6 +-
 .../org/apache/iceberg/spark/TestSpark3Util.java   | 80 ++++++++++++++++++++++
 4 files changed, 150 insertions(+), 2 deletions(-)

diff --git a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
index d5bebdd..8dce479 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.NullOrder;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.TableProperties;
@@ -478,6 +479,10 @@ public class Spark3Util {
     return TypeUtil.visit(type, DescribeSchemaVisitor.INSTANCE);
   }
 
+  public static String describe(org.apache.iceberg.SortOrder order) {
+    return Joiner.on(", ").join(SortOrderVisitor.visit(order, DescribeSortOrderVisitor.INSTANCE));
+  }
+
   public static boolean isLocalityEnabled(FileIO io, String location, CaseInsensitiveStringMap readOptions) {
     InputFile in = io.newInputFile(location);
     if (in instanceof HadoopInputFile) {
@@ -880,4 +885,59 @@ public class Spark3Util {
     Option<String> database = namespace.length == 1 ? Option.apply(namespace[0]) : Option.empty();
     return org.apache.spark.sql.catalyst.TableIdentifier.apply(table, database);
   }
+
+  private static class DescribeSortOrderVisitor implements SortOrderVisitor<String> {
+    private static final DescribeSortOrderVisitor INSTANCE = new DescribeSortOrderVisitor();
+
+    private DescribeSortOrderVisitor() {
+    }
+
+    @Override
+    public String field(String sourceName, int sourceId,
+                        org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+      return String.format("%s %s %s", sourceName, direction, nullOrder);
+    }
+
+    @Override
+    public String bucket(String sourceName, int sourceId, int numBuckets,
+                         org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+      return String.format("bucket(%s, %s) %s %s", numBuckets, sourceName, direction, nullOrder);
+    }
+
+    @Override
+    public String truncate(String sourceName, int sourceId, int width,
+                           org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+      return String.format("truncate(%s, %s) %s %s", sourceName, width, direction, nullOrder);
+    }
+
+    @Override
+    public String year(String sourceName, int sourceId,
+                       org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+      return String.format("years(%s) %s %s", sourceName, direction, nullOrder);
+    }
+
+    @Override
+    public String month(String sourceName, int sourceId,
+                        org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+      return String.format("months(%s) %s %s", sourceName, direction, nullOrder);
+    }
+
+    @Override
+    public String day(String sourceName, int sourceId,
+                      org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+      return String.format("days(%s) %s %s", sourceName, direction, nullOrder);
+    }
+
+    @Override
+    public String hour(String sourceName, int sourceId,
+                       org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+      return String.format("hours(%s) %s %s", sourceName, direction, nullOrder);
+    }
+
+    @Override
+    public String unknown(String sourceName, int sourceId, String transform,
+                          org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+      return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder);
+    }
+  }
 }
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 26b2222..93531a3 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -64,7 +64,7 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
   private static final Logger LOG = LoggerFactory.getLogger(SparkTable.class);
 
   private static final Set<String> RESERVED_PROPERTIES =
-      ImmutableSet.of("provider", "format", "current-snapshot-id", "location");
+      ImmutableSet.of("provider", "format", "current-snapshot-id", "location", "sort-order");
   private static final Set<TableCapability> CAPABILITIES = ImmutableSet.of(
       TableCapability.BATCH_READ,
       TableCapability.BATCH_WRITE,
@@ -141,6 +141,10 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
     propsBuilder.put("current-snapshot-id", currentSnapshotId);
     propsBuilder.put("location", icebergTable.location());
 
+    if (!icebergTable.sortOrder().isUnsorted()) {
+      propsBuilder.put("sort-order", Spark3Util.describe(icebergTable.sortOrder()));
+    }
+
     icebergTable.properties().entrySet().stream()
         .filter(entry -> !RESERVED_PROPERTIES.contains(entry.getKey()))
         .forEach(propsBuilder::put);
diff --git a/spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java b/spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
index e31d6cb..dfa4740 100644
--- a/spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
+++ b/spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
@@ -455,8 +455,10 @@ public class TestCreateActions extends SparkCatalogTestBase {
     createSourceTable(CREATE_PARQUET, source);
     assertMigratedFileCount(Actions.snapshot(source, dest), source, dest);
     SparkTable table = loadTable(dest);
+    // set sort orders
+    table.table().replaceSortOrder().asc("id").desc("data").commit();
 
-    String[] keys = {"provider", "format", "current-snapshot-id", "location"};
+    String[] keys = {"provider", "format", "current-snapshot-id", "location", "sort-order"};
 
     for (String entry : keys) {
       Assert.assertTrue("Created table missing reserved property " + entry, table.properties().containsKey(entry));
@@ -466,6 +468,8 @@ public class TestCreateActions extends SparkCatalogTestBase {
     Assert.assertEquals("Unexpected format", "iceberg/parquet", table.properties().get("format"));
     Assert.assertNotEquals("No current-snapshot-id found", "none", table.properties().get("current-snapshot-id"));
     Assert.assertTrue("Location isn't correct", table.properties().get("location").endsWith(destTableName));
+    Assert.assertEquals("Sort-order isn't correct", "id ASC NULLS FIRST, data DESC NULLS LAST",
+        table.properties().get("sort-order"));
   }
 
   @Test
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java b/spark3/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
new file mode 100644
index 0000000..472244d
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderParser;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.NullOrder.NULLS_FIRST;
+import static org.apache.iceberg.NullOrder.NULLS_LAST;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestSpark3Util {
+  @Test
+  public void testDescribeSortOrder() {
+    Schema schema = new Schema(
+            required(1, "data", Types.StringType.get()),
+            required(2, "time", Types.TimestampType.withoutZone())
+    );
+
+    Assert.assertEquals("Sort order isn't correct.", "data DESC NULLS FIRST",
+        Spark3Util.describe(buildSortOrder("Identity", schema, 1)));
+    Assert.assertEquals("Sort order isn't correct.", "bucket(1, data) DESC NULLS FIRST",
+        Spark3Util.describe(buildSortOrder("bucket[1]", schema, 1)));
+    Assert.assertEquals("Sort order isn't correct.", "truncate(data, 3) DESC NULLS FIRST",
+        Spark3Util.describe(buildSortOrder("truncate[3]", schema, 1)));
+    Assert.assertEquals("Sort order isn't correct.", "years(time) DESC NULLS FIRST",
+        Spark3Util.describe(buildSortOrder("year", schema, 2)));
+    Assert.assertEquals("Sort order isn't correct.", "months(time) DESC NULLS FIRST",
+        Spark3Util.describe(buildSortOrder("month", schema, 2)));
+    Assert.assertEquals("Sort order isn't correct.", "days(time) DESC NULLS FIRST",
+        Spark3Util.describe(buildSortOrder("day", schema, 2)));
+    Assert.assertEquals("Sort order isn't correct.", "hours(time) DESC NULLS FIRST",
+        Spark3Util.describe(buildSortOrder("hour", schema, 2)));
+    Assert.assertEquals("Sort order isn't correct.", "unknown(data) DESC NULLS FIRST",
+        Spark3Util.describe(buildSortOrder("unknown", schema, 1)));
+
+    // multiple sort orders
+    SortOrder multiOrder = SortOrder.builderFor(schema)
+            .asc("time", NULLS_FIRST)
+            .asc("data", NULLS_LAST)
+            .build();
+    Assert.assertEquals("Sort order isn't correct.", "time ASC NULLS FIRST, data ASC NULLS LAST",
+            Spark3Util.describe(multiOrder));
+  }
+
+  private SortOrder buildSortOrder(String transform, Schema schema, int sourceId) {
+    String jsonString = "{\n" +
+            "  \"order-id\" : 10,\n" +
+            "  \"fields\" : [ {\n" +
+            "    \"transform\" : \"" + transform + "\",\n" +
+            "    \"source-id\" : " + sourceId + ",\n" +
+            "    \"direction\" : \"desc\",\n" +
+            "    \"null-order\" : \"nulls-first\"\n" +
+            "  } ]\n" +
+            "}";
+
+    return SortOrderParser.fromJson(schema, jsonString);
+  }
+}