You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/12/15 02:44:40 UTC

[drill] branch master updated: DRILL-8369: Add support for querying DeltaLake snapshots by version (#2718)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new eae77b93c8 DRILL-8369: Add support for querying DeltaLake snapshots by version (#2718)
eae77b93c8 is described below

commit eae77b93c8a06033b6dc51b4bae595bb10e505e8
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Thu Dec 15 04:44:31 2022 +0200

    DRILL-8369: Add support for querying DeltaLake snapshots by version (#2718)
---
 contrib/format-deltalake/README.md                 | 19 ++++++
 .../drill/exec/store/delta/DeltaGroupScan.java     | 26 +++-----
 .../delta/format/DeltaFormatPluginConfig.java      | 42 +++++++++++-
 .../DeltaLatestSnapshot.java}                      | 18 +++---
 .../DeltaSnapshot.java}                            | 16 ++---
 .../DeltaSnapshotByTimestamp.java}                 | 20 +++---
 .../DeltaSnapshotByVersion.java}                   | 20 +++---
 .../store/delta/snapshot/DeltaSnapshotFactory.java | 75 ++++++++++++++++++++++
 .../drill/exec/store/delta/DeltaQueriesTest.java   | 15 ++++-
 9 files changed, 195 insertions(+), 56 deletions(-)

diff --git a/contrib/format-deltalake/README.md b/contrib/format-deltalake/README.md
index e4cbaef529..a20baa75ea 100644
--- a/contrib/format-deltalake/README.md
+++ b/contrib/format-deltalake/README.md
@@ -16,6 +16,25 @@ For the case of filter pushdown, all expressions supported by Delta Lake API wil
 matches the filter expression will be read. Additionally, filtering logic for parquet files is enabled
 to allow pruning of parquet files that do not match the filter expression.
 
+### Querying specific table versions (snapshots)
+
+Delta Lake has the ability to travel back in time to the specific data version.
+
+The following ways of specifying data version are supported:
+
+- `version` - the version number of the specific snapshot
+- `timestamp` - the timestamp in milliseconds at or before which the specific snapshot was generated
+
+Table function can be used to specify one of the above configs in the following way:
+
+```sql
+SELECT *
+FROM table(dfs.tmp.testAllTypes(type => 'delta', version => 0));
+
+SELECT *
+FROM table(dfs.tmp.testAllTypes(type => 'delta', timestamp => 1636231332000));
+```
+
 ## Configuration
 
 The format plugin has the following configuration options:
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaGroupScan.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaGroupScan.java
index 2130bea075..6e0ddd690d 100644
--- a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaGroupScan.java
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaGroupScan.java
@@ -66,6 +66,7 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.delta.format.DeltaFormatPlugin;
 import org.apache.drill.exec.store.delta.plan.DrillExprToDeltaTranslator;
+import org.apache.drill.exec.store.delta.snapshot.DeltaSnapshotFactory;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
@@ -217,11 +218,9 @@ public class DeltaGroupScan extends AbstractParquetGroupScan {
 
   @Override
   public DeltaGroupScan clone(List<SchemaPath> columns) {
-    try {
-      return toBuilder().columns(columns).build();
-    } catch (IOException e) {
-      throw new DrillRuntimeException(e);
-    }
+    DeltaGroupScan groupScan = new DeltaGroupScan(this);
+    groupScan.columns = columns;
+    return groupScan;
   }
 
   @Override
@@ -332,17 +331,6 @@ public class DeltaGroupScan extends AbstractParquetGroupScan {
       .toString();
   }
 
-  public DeltaGroupScanBuilder toBuilder() {
-    return new DeltaGroupScanBuilder()
-      .userName(this.userName)
-      .formatPlugin(this.formatPlugin)
-      .schema(this.schema)
-      .path(this.path)
-      .condition(this.condition)
-      .columns(this.columns)
-      .limit(this.limit);
-  }
-
   private static class DeltaParquetScanFilterer extends RowGroupScanFilterer<DeltaParquetScanFilterer> {
 
     public DeltaParquetScanFilterer(DeltaGroupScan source) {
@@ -481,7 +469,11 @@ public class DeltaGroupScan extends AbstractParquetGroupScan {
 
     public DeltaGroupScan build() throws IOException {
       DeltaLog log = DeltaLog.forTable(formatPlugin.getFsConf(), path);
-      Snapshot snapshot = log.snapshot();
+      DeltaSnapshotFactory.SnapshotContext context = DeltaSnapshotFactory.SnapshotContext.builder()
+        .snapshotAsOfTimestamp(formatPlugin.getConfig().getTimestamp())
+        .snapshotAsOfVersion(formatPlugin.getConfig().getVersion())
+        .build();
+      Snapshot snapshot = DeltaSnapshotFactory.INSTANCE.create(context).apply(log);
       StructType structType = snapshot.getMetadata().getSchema();
       schema = toSchema(structType);
 
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
index 9fd7bb7e30..4a041160af 100644
--- a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
@@ -18,15 +18,55 @@
 package org.apache.drill.exec.store.delta.format;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.FormatPluginConfig;
 
+import java.util.Objects;
+
 @JsonTypeName(DeltaFormatPluginConfig.NAME)
 public class DeltaFormatPluginConfig implements FormatPluginConfig {
 
   public static final String NAME = "delta";
 
+  private final Long version;
+  private final Long timestamp;
+
   @JsonCreator
-  public DeltaFormatPluginConfig() {
+  public DeltaFormatPluginConfig(@JsonProperty("version") Long version,
+    @JsonProperty("timestamp") Long timestamp) {
+    this.version = version;
+    this.timestamp = timestamp;
+  }
+
+  @JsonProperty("version")
+  public Long getVersion() {
+    return version;
+  }
+
+  @JsonProperty("timestamp")
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DeltaFormatPluginConfig that = (DeltaFormatPluginConfig) o;
+    return Objects.equals(version, that.version) && Objects.equals(timestamp, that.timestamp);
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("version", version)
+      .field("timestamp", timestamp)
+      .toString();
   }
 }
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaLatestSnapshot.java
similarity index 65%
copy from contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
copy to contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaLatestSnapshot.java
index 9fd7bb7e30..214468bdbb 100644
--- a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaLatestSnapshot.java
@@ -15,18 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.delta.format;
+package org.apache.drill.exec.store.delta.snapshot;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.logical.FormatPluginConfig;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
 
-@JsonTypeName(DeltaFormatPluginConfig.NAME)
-public class DeltaFormatPluginConfig implements FormatPluginConfig {
+public class DeltaLatestSnapshot implements DeltaSnapshot {
+  public static final DeltaLatestSnapshot INSTANCE = new DeltaLatestSnapshot();
 
-  public static final String NAME = "delta";
-
-  @JsonCreator
-  public DeltaFormatPluginConfig() {
+  @Override
+  public Snapshot apply(DeltaLog log) {
+    return log.snapshot();
   }
 }
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaSnapshot.java
similarity index 65%
copy from contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
copy to contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaSnapshot.java
index 9fd7bb7e30..6254f76780 100644
--- a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaSnapshot.java
@@ -15,18 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.delta.format;
+package org.apache.drill.exec.store.delta.snapshot;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.logical.FormatPluginConfig;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
 
-@JsonTypeName(DeltaFormatPluginConfig.NAME)
-public class DeltaFormatPluginConfig implements FormatPluginConfig {
+public interface DeltaSnapshot {
 
-  public static final String NAME = "delta";
-
-  @JsonCreator
-  public DeltaFormatPluginConfig() {
-  }
+  Snapshot apply(DeltaLog log);
 }
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaSnapshotByTimestamp.java
similarity index 66%
copy from contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
copy to contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaSnapshotByTimestamp.java
index 9fd7bb7e30..325e947b86 100644
--- a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaSnapshotByTimestamp.java
@@ -15,18 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.delta.format;
+package org.apache.drill.exec.store.delta.snapshot;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.logical.FormatPluginConfig;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
 
-@JsonTypeName(DeltaFormatPluginConfig.NAME)
-public class DeltaFormatPluginConfig implements FormatPluginConfig {
+@JsonTypeName("timestamp")
+public class DeltaSnapshotByTimestamp implements DeltaSnapshot {
+  private final long timestamp;
 
-  public static final String NAME = "delta";
+  public DeltaSnapshotByTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
 
-  @JsonCreator
-  public DeltaFormatPluginConfig() {
+  @Override
+  public Snapshot apply(DeltaLog log) {
+    return log.getSnapshotForTimestampAsOf(timestamp);
   }
 }
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaSnapshotByVersion.java
similarity index 66%
copy from contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
copy to contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaSnapshotByVersion.java
index 9fd7bb7e30..2b32c39c1f 100644
--- a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaSnapshotByVersion.java
@@ -15,18 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.delta.format;
+package org.apache.drill.exec.store.delta.snapshot;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.logical.FormatPluginConfig;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.Snapshot;
 
-@JsonTypeName(DeltaFormatPluginConfig.NAME)
-public class DeltaFormatPluginConfig implements FormatPluginConfig {
+@JsonTypeName("version")
+public class DeltaSnapshotByVersion implements DeltaSnapshot {
+  private final long version;
 
-  public static final String NAME = "delta";
+  public DeltaSnapshotByVersion(long version) {
+    this.version = version;
+  }
 
-  @JsonCreator
-  public DeltaFormatPluginConfig() {
+  @Override
+  public Snapshot apply(DeltaLog log) {
+    return log.getSnapshotForVersionAsOf(version);
   }
 }
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaSnapshotFactory.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaSnapshotFactory.java
new file mode 100644
index 0000000000..77d58373c3
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/snapshot/DeltaSnapshotFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.snapshot;
+
+public class DeltaSnapshotFactory {
+  public static final DeltaSnapshotFactory INSTANCE = new DeltaSnapshotFactory();
+
+  public DeltaSnapshot create(SnapshotContext context) {
+    if (context.getSnapshotAsOfVersion() != null) {
+      return new DeltaSnapshotByVersion(context.getSnapshotAsOfVersion());
+    } else if (context.getSnapshotAsOfTimestamp() != null) {
+      return new DeltaSnapshotByTimestamp(context.getSnapshotAsOfTimestamp());
+    } else {
+      return DeltaLatestSnapshot.INSTANCE;
+    }
+  }
+
+  public static class SnapshotContext {
+    private final Long snapshotAsOfVersion;
+
+    private final Long snapshotAsOfTimestamp;
+
+    SnapshotContext(SnapshotContextBuilder builder) {
+      this.snapshotAsOfVersion = builder.snapshotAsOfVersion;
+      this.snapshotAsOfTimestamp = builder.snapshotAsOfTimestamp;
+    }
+
+    public static SnapshotContextBuilder builder() {
+      return new SnapshotContextBuilder();
+    }
+
+    public Long getSnapshotAsOfVersion() {
+      return this.snapshotAsOfVersion;
+    }
+
+    public Long getSnapshotAsOfTimestamp() {
+      return this.snapshotAsOfTimestamp;
+    }
+
+    public static class SnapshotContextBuilder {
+      private Long snapshotAsOfVersion;
+
+      private Long snapshotAsOfTimestamp;
+
+      public SnapshotContextBuilder snapshotAsOfVersion(Long snapshotAsOfVersion) {
+        this.snapshotAsOfVersion = snapshotAsOfVersion;
+        return this;
+      }
+
+      public SnapshotContextBuilder snapshotAsOfTimestamp(Long snapshotAsOfTime) {
+        this.snapshotAsOfTimestamp = snapshotAsOfTime;
+        return this;
+      }
+
+      public SnapshotContext build() {
+        return new SnapshotContext(this);
+      }
+    }
+  }
+}
diff --git a/contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java b/contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java
index e74b7d350e..fb98fec5e2 100644
--- a/contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java
+++ b/contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java
@@ -44,7 +44,7 @@ public class DeltaQueriesTest extends ClusterTest {
     StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
     FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig();
     Map<String, FormatPluginConfig> formats = new HashMap<>(pluginConfig.getFormats());
-    formats.put("delta", new DeltaFormatPluginConfig());
+    formats.put("delta", new DeltaFormatPluginConfig(null, null));
     FileSystemConfig newPluginConfig = new FileSystemConfig(
       pluginConfig.getConnection(),
       pluginConfig.getConfig(),
@@ -193,4 +193,17 @@ public class DeltaQueriesTest extends ClusterTest {
     long count = queryBuilder().sql(query).run().recordCount();
     assertEquals(1, count);
   }
+
+  @Test
+  public void testSnapshotVersion() throws Exception {
+    String query = "select as_int, as_string " +
+      "from table(dfs.`data-reader-partition-values`(type => 'delta', version => 0))  where as_long = 1";
+
+    testBuilder()
+      .sqlQuery(query)
+      .ordered()
+      .baselineColumns("as_int", "as_string")
+      .baselineValues("1", "1")
+      .go();
+  }
 }