You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/04/07 04:01:44 UTC
[iceberg] branch master updated: Spark: Show sort order as a table
property (#2421)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0df5de7 Spark: Show sort order as a table property (#2421)
0df5de7 is described below
commit 0df5de711e773b5143f57f62dd52dc40b1b2f8bd
Author: Yufei Gu <yu...@apache.org>
AuthorDate: Tue Apr 6 21:01:30 2021 -0700
Spark: Show sort order as a table property (#2421)
---
.../java/org/apache/iceberg/spark/Spark3Util.java | 60 ++++++++++++++++
.../apache/iceberg/spark/source/SparkTable.java | 6 +-
.../apache/iceberg/actions/TestCreateActions.java | 6 +-
.../org/apache/iceberg/spark/TestSpark3Util.java | 80 ++++++++++++++++++++++
4 files changed, 150 insertions(+), 2 deletions(-)
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
index d5bebdd..8dce479 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableProperties;
@@ -478,6 +479,10 @@ public class Spark3Util {
return TypeUtil.visit(type, DescribeSchemaVisitor.INSTANCE);
}
+ public static String describe(org.apache.iceberg.SortOrder order) {
+ return Joiner.on(", ").join(SortOrderVisitor.visit(order, DescribeSortOrderVisitor.INSTANCE));
+ }
+
public static boolean isLocalityEnabled(FileIO io, String location, CaseInsensitiveStringMap readOptions) {
InputFile in = io.newInputFile(location);
if (in instanceof HadoopInputFile) {
@@ -880,4 +885,59 @@ public class Spark3Util {
Option<String> database = namespace.length == 1 ? Option.apply(namespace[0]) : Option.empty();
return org.apache.spark.sql.catalyst.TableIdentifier.apply(table, database);
}
+
+ private static class DescribeSortOrderVisitor implements SortOrderVisitor<String> {
+ private static final DescribeSortOrderVisitor INSTANCE = new DescribeSortOrderVisitor();
+
+ private DescribeSortOrderVisitor() {
+ }
+
+ @Override
+ public String field(String sourceName, int sourceId,
+ org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+ return String.format("%s %s %s", sourceName, direction, nullOrder);
+ }
+
+ @Override
+ public String bucket(String sourceName, int sourceId, int numBuckets,
+ org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+ return String.format("bucket(%s, %s) %s %s", numBuckets, sourceName, direction, nullOrder);
+ }
+
+ @Override
+ public String truncate(String sourceName, int sourceId, int width,
+ org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+ return String.format("truncate(%s, %s) %s %s", sourceName, width, direction, nullOrder);
+ }
+
+ @Override
+ public String year(String sourceName, int sourceId,
+ org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+ return String.format("years(%s) %s %s", sourceName, direction, nullOrder);
+ }
+
+ @Override
+ public String month(String sourceName, int sourceId,
+ org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+ return String.format("months(%s) %s %s", sourceName, direction, nullOrder);
+ }
+
+ @Override
+ public String day(String sourceName, int sourceId,
+ org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+ return String.format("days(%s) %s %s", sourceName, direction, nullOrder);
+ }
+
+ @Override
+ public String hour(String sourceName, int sourceId,
+ org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+ return String.format("hours(%s) %s %s", sourceName, direction, nullOrder);
+ }
+
+ @Override
+ public String unknown(String sourceName, int sourceId, String transform,
+ org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+ return String.format("%s(%s) %s %s", transform, sourceName, direction, nullOrder);
+ }
+ }
}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 26b2222..93531a3 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -64,7 +64,7 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
private static final Logger LOG = LoggerFactory.getLogger(SparkTable.class);
private static final Set<String> RESERVED_PROPERTIES =
- ImmutableSet.of("provider", "format", "current-snapshot-id", "location");
+ ImmutableSet.of("provider", "format", "current-snapshot-id", "location", "sort-order");
private static final Set<TableCapability> CAPABILITIES = ImmutableSet.of(
TableCapability.BATCH_READ,
TableCapability.BATCH_WRITE,
@@ -141,6 +141,10 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
propsBuilder.put("current-snapshot-id", currentSnapshotId);
propsBuilder.put("location", icebergTable.location());
+ if (!icebergTable.sortOrder().isUnsorted()) {
+ propsBuilder.put("sort-order", Spark3Util.describe(icebergTable.sortOrder()));
+ }
+
icebergTable.properties().entrySet().stream()
.filter(entry -> !RESERVED_PROPERTIES.contains(entry.getKey()))
.forEach(propsBuilder::put);
diff --git a/spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java b/spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
index e31d6cb..dfa4740 100644
--- a/spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
+++ b/spark3/src/test/java/org/apache/iceberg/actions/TestCreateActions.java
@@ -455,8 +455,10 @@ public class TestCreateActions extends SparkCatalogTestBase {
createSourceTable(CREATE_PARQUET, source);
assertMigratedFileCount(Actions.snapshot(source, dest), source, dest);
SparkTable table = loadTable(dest);
+ // set sort orders
+ table.table().replaceSortOrder().asc("id").desc("data").commit();
- String[] keys = {"provider", "format", "current-snapshot-id", "location"};
+ String[] keys = {"provider", "format", "current-snapshot-id", "location", "sort-order"};
for (String entry : keys) {
Assert.assertTrue("Created table missing reserved property " + entry, table.properties().containsKey(entry));
@@ -466,6 +468,8 @@ public class TestCreateActions extends SparkCatalogTestBase {
Assert.assertEquals("Unexpected format", "iceberg/parquet", table.properties().get("format"));
Assert.assertNotEquals("No current-snapshot-id found", "none", table.properties().get("current-snapshot-id"));
Assert.assertTrue("Location isn't correct", table.properties().get("location").endsWith(destTableName));
+ Assert.assertEquals("Sort-order isn't correct", "id ASC NULLS FIRST, data DESC NULLS LAST",
+ table.properties().get("sort-order"));
}
@Test
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java b/spark3/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
new file mode 100644
index 0000000..472244d
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/TestSpark3Util.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderParser;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.NullOrder.NULLS_FIRST;
+import static org.apache.iceberg.NullOrder.NULLS_LAST;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestSpark3Util {
+ @Test
+ public void testDescribeSortOrder() {
+ Schema schema = new Schema(
+ required(1, "data", Types.StringType.get()),
+ required(2, "time", Types.TimestampType.withoutZone())
+ );
+
+ Assert.assertEquals("Sort order isn't correct.", "data DESC NULLS FIRST",
+ Spark3Util.describe(buildSortOrder("Identity", schema, 1)));
+ Assert.assertEquals("Sort order isn't correct.", "bucket(1, data) DESC NULLS FIRST",
+ Spark3Util.describe(buildSortOrder("bucket[1]", schema, 1)));
+ Assert.assertEquals("Sort order isn't correct.", "truncate(data, 3) DESC NULLS FIRST",
+ Spark3Util.describe(buildSortOrder("truncate[3]", schema, 1)));
+ Assert.assertEquals("Sort order isn't correct.", "years(time) DESC NULLS FIRST",
+ Spark3Util.describe(buildSortOrder("year", schema, 2)));
+ Assert.assertEquals("Sort order isn't correct.", "months(time) DESC NULLS FIRST",
+ Spark3Util.describe(buildSortOrder("month", schema, 2)));
+ Assert.assertEquals("Sort order isn't correct.", "days(time) DESC NULLS FIRST",
+ Spark3Util.describe(buildSortOrder("day", schema, 2)));
+ Assert.assertEquals("Sort order isn't correct.", "hours(time) DESC NULLS FIRST",
+ Spark3Util.describe(buildSortOrder("hour", schema, 2)));
+ Assert.assertEquals("Sort order isn't correct.", "unknown(data) DESC NULLS FIRST",
+ Spark3Util.describe(buildSortOrder("unknown", schema, 1)));
+
+ // multiple sort orders
+ SortOrder multiOrder = SortOrder.builderFor(schema)
+ .asc("time", NULLS_FIRST)
+ .asc("data", NULLS_LAST)
+ .build();
+ Assert.assertEquals("Sort order isn't correct.", "time ASC NULLS FIRST, data ASC NULLS LAST",
+ Spark3Util.describe(multiOrder));
+ }
+
+ private SortOrder buildSortOrder(String transform, Schema schema, int sourceId) {
+ String jsonString = "{\n" +
+ " \"order-id\" : 10,\n" +
+ " \"fields\" : [ {\n" +
+ " \"transform\" : \"" + transform + "\",\n" +
+ " \"source-id\" : " + sourceId + ",\n" +
+ " \"direction\" : \"desc\",\n" +
+ " \"null-order\" : \"nulls-first\"\n" +
+ " } ]\n" +
+ "}";
+
+ return SortOrderParser.fromJson(schema, jsonString);
+ }
+}