You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2022/11/08 18:31:33 UTC

[GitHub] [drill] vvysotskyi opened a new pull request, #2702: DRILL-8353: Format plugin for Delta Lake

vvysotskyi opened a new pull request, #2702:
URL: https://github.com/apache/drill/pull/2702

   # [DRILL-8353](https://issues.apache.org/jira/browse/DRILL-8353): Format plugin for Delta Lake
   
   ## Description
   This pull request adds support for reading delta lake tables.
   
   ## Documentation
   See README.md
   
   ## Testing
   Added unit tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [drill] cgivre merged pull request #2702: DRILL-8353: Format plugin for Delta Lake

Posted by GitBox <gi...@apache.org>.
cgivre merged PR #2702:
URL: https://github.com/apache/drill/pull/2702


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [drill] kmatt commented on pull request #2702: DRILL-8353: Format plugin for Delta Lake

Posted by GitBox <gi...@apache.org>.
kmatt commented on PR #2702:
URL: https://github.com/apache/drill/pull/2702#issuecomment-1335803778

   @cgivre @vvysotskyi Thanks, I missed the "will be" clause ;)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [drill] kmatt commented on pull request #2702: DRILL-8353: Format plugin for Delta Lake

Posted by GitBox <gi...@apache.org>.
kmatt commented on PR #2702:
URL: https://github.com/apache/drill/pull/2702#issuecomment-1334708491

   @vvysotskyi Does this support VERSION AS OF queries?
   
   https://docs.delta.io/latest/quick-start.html#read-older-versions-of-data-using-time-travel
   
   Ex: `SELECT * FROM dfs.delta.`/tmp/delta-table` VERSION AS OF 0;`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] DRILL-8353: Format plugin for Delta Lake (drill)

Posted by "kmatt (via GitHub)" <gi...@apache.org>.
kmatt commented on PR #2702:
URL: https://github.com/apache/drill/pull/2702#issuecomment-1600977689

   #2810, #2809


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [drill] kmatt commented on pull request #2702: DRILL-8353: Format plugin for Delta Lake

Posted by GitBox <gi...@apache.org>.
kmatt commented on PR #2702:
URL: https://github.com/apache/drill/pull/2702#issuecomment-1331605371

   On Windows 10 `git clone` fails due to a path length in this patch. Repo clones successfully on Debian 11.
   
   ```
   git clone https://github.com/apache/drill.git
   Cloning into 'drill'...
   remote: Enumerating objects: 156537, done.
   remote: Counting objects: 100% (1323/1323), done.
   remote: Compressing objects: 100% (723/723), done.
   remote: Total 156537 (delta 322), reused 1119 (delta 218), pack-reused 155214Receiving objects: 100% (156537/156537), 62.00 MiB | 11.15 MiBReceiving objects: 100% (156537/156537), 65.97 MiB | 11.24 MiB/s, done.
   
   Resolving deltas: 100% (79075/79075), done.
   fatal: cannot create directory at 'contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11': Filename too long
   warning: Clone succeeded, but checkout failed.
   You can inspect what was checked out with 'git status'
   and retry with 'git restore --source=HEAD :/'
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [drill] kmatt commented on pull request #2702: DRILL-8353: Format plugin for Delta Lake

Posted by GitBox <gi...@apache.org>.
kmatt commented on PR #2702:
URL: https://github.com/apache/drill/pull/2702#issuecomment-1335452201

   The version function seems not to parse:
   
   ```
   apache drill (dfs.delta)> select count(*) from table(dfs.delta.`delta_table`(type => 'delta'));
   +--------+
   | EXPR$0 |
   +--------+
   | 20     |
   +--------+
   1 row selected (0.157 seconds)
   
   apache drill (dfs.delta)> SELECT *
   2..............semicolon> FROM table(dfs.delta.`delta_table`(type => 'delta', version => 0));
   Error: VALIDATION ERROR: From line 2, column 22 to line 2, column 75: No match found for function signature delta_table(type => <CHARACTER>, version => <NUMERIC>)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [drill] cgivre commented on pull request #2702: DRILL-8353: Format plugin for Delta Lake

Posted by GitBox <gi...@apache.org>.
cgivre commented on PR #2702:
URL: https://github.com/apache/drill/pull/2702#issuecomment-1335460790

   @kmatt This hasn't been implemented yet.   That's why the query doesn't yet work.  @vvysotskyi is working on that. :-) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [drill] vvysotskyi commented on a diff in pull request #2702: DRILL-8353: Format plugin for Delta Lake

Posted by GitBox <gi...@apache.org>.
vvysotskyi commented on code in PR #2702:
URL: https://github.com/apache/drill/pull/2702#discussion_r1027065550


##########
contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DrillExprToDeltaTranslator.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.plan;
+
+import io.delta.standalone.expressions.And;
+import io.delta.standalone.expressions.EqualTo;
+import io.delta.standalone.expressions.Expression;
+import io.delta.standalone.expressions.GreaterThan;
+import io.delta.standalone.expressions.GreaterThanOrEqual;
+import io.delta.standalone.expressions.IsNotNull;
+import io.delta.standalone.expressions.IsNull;
+import io.delta.standalone.expressions.LessThan;
+import io.delta.standalone.expressions.LessThanOrEqual;
+import io.delta.standalone.expressions.Literal;
+import io.delta.standalone.expressions.Not;
+import io.delta.standalone.expressions.Or;
+import io.delta.standalone.expressions.Predicate;
+import io.delta.standalone.types.StructType;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+public class DrillExprToDeltaTranslator extends AbstractExprVisitor<Expression, Void, RuntimeException> {
+
+  private final StructType structType;
+
+  public DrillExprToDeltaTranslator(StructType structType) {
+    this.structType = structType;
+  }
+
+  @Override
+  public Expression visitFunctionCall(FunctionCall call, Void value) {
+    try {
+      return visitFunctionCall(call);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  private Predicate visitFunctionCall(FunctionCall call) {
+    switch (call.getName()) {
+      case FunctionNames.AND: {
+        Expression left = call.arg(0).accept(this, null);
+        Expression right = call.arg(1).accept(this, null);
+        if (left != null && right != null) {
+          return new And(left, right);
+        }
+        return null;
+      }
+      case FunctionNames.OR: {
+        Expression left = call.arg(0).accept(this, null);
+        Expression right = call.arg(1).accept(this, null);
+        if (left != null && right != null) {
+          return new Or(left, right);
+        }
+        return null;
+      }
+      case FunctionNames.NOT: {
+        Expression expression = call.arg(0).accept(this, null);
+        if (expression != null) {
+          return new Not(expression);
+        }
+        return null;
+      }
+      case FunctionNames.IS_NULL: {
+        LogicalExpression arg = call.arg(0);
+        if (arg instanceof SchemaPath) {
+          String name = getPath((SchemaPath) arg);
+          return new IsNull(structType.column(name));
+        }
+        return null;
+      }
+      case FunctionNames.IS_NOT_NULL: {
+        LogicalExpression arg = call.arg(0);
+        if (arg instanceof SchemaPath) {
+          String name = getPath((SchemaPath) arg);
+          return new IsNotNull(structType.column(name));
+        }
+        return null;
+      }
+      case FunctionNames.LT: {
+        LogicalExpression nameRef = call.arg(0);
+        Expression expression = call.arg(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new LessThan(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.LE: {
+        LogicalExpression nameRef = call.arg(0);
+        Expression expression = call.arg(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new LessThanOrEqual(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.GT: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new GreaterThan(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.GE: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(0).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new GreaterThanOrEqual(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.EQ: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new EqualTo(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.NE: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new Not(new EqualTo(structType.column(name), expression));
+        }
+        return null;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Expression visitFloatConstant(ValueExpressions.FloatExpression fExpr, Void value) {
+    return Literal.of(fExpr.getFloat());
+  }
+
+  @Override
+  public Expression visitIntConstant(ValueExpressions.IntExpression intExpr, Void value) {
+    return Literal.of(intExpr.getInt());
+  }
+
+  @Override
+  public Expression visitLongConstant(ValueExpressions.LongExpression longExpr, Void value) {
+    return Literal.of(longExpr.getLong());
+  }
+
+  @Override
+  public Expression visitDecimal9Constant(ValueExpressions.Decimal9Expression decExpr, Void value) {
+    return Literal.of(decExpr.getIntFromDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal18Constant(ValueExpressions.Decimal18Expression decExpr, Void value) {
+    return Literal.of(decExpr.getLongFromDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal28Constant(ValueExpressions.Decimal28Expression decExpr, Void value) {
+    return Literal.of(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal38Constant(ValueExpressions.Decimal38Expression decExpr, Void value) {
+    return Literal.of(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitVarDecimalConstant(ValueExpressions.VarDecimalExpression decExpr, Void value) {
+    return Literal.of(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitDateConstant(ValueExpressions.DateExpression dateExpr, Void value) {
+    return Literal.of(dateExpr.getDate());
+  }
+
+  @Override
+  public Expression visitTimeConstant(ValueExpressions.TimeExpression timeExpr, Void value) {
+    return Literal.of(timeExpr.getTime());
+  }
+
+  @Override
+  public Expression visitTimeStampConstant(ValueExpressions.TimeStampExpression timestampExpr, Void value) {
+    return Literal.of(timestampExpr.getTimeStamp());
+  }
+
+  @Override
+  public Expression visitDoubleConstant(ValueExpressions.DoubleExpression dExpr, Void value) {
+    return Literal.of(dExpr.getDouble());
+  }
+
+  @Override
+  public Expression visitBooleanConstant(ValueExpressions.BooleanExpression e, Void value) {
+    return Literal.of(e.getBoolean());
+  }
+
+  @Override
+  public Expression visitQuotedStringConstant(ValueExpressions.QuotedString e, Void value) {
+    return Literal.of(e.getString());
+  }
+
+  @Override
+  public Expression visitUnknown(LogicalExpression e, Void value) {
+    return null;
+  }
+
+  private static String getPath(SchemaPath schemaPath) {

Review Comment:
   Yes, I think `element` keyword usage is not common.



##########
contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPlugin.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.delta.DeltaGroupScan;
+import org.apache.drill.exec.store.delta.plan.DeltaPluginImplementor;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DeltaFormatPlugin implements FormatPlugin {
+
+  private static final String DELTA_CONVENTION_PREFIX = "DELTA.";
+
+  /**
+   * Generator for format id values. Formats with the same name may be defined
+   * in multiple storage plugins, so using the unique id within the convention name
+   * to ensure the rule names will be unique for different plugin instances.
+   */
+  private static final AtomicInteger NEXT_ID = new AtomicInteger(0);
+
+  private final FileSystemConfig storageConfig;
+
+  private final DeltaFormatPluginConfig config;
+
+  private final Configuration fsConf;
+
+  private final DrillbitContext context;
+
+  private final String name;
+
+  private final DeltaFormatMatcher matcher;
+
+  private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+
+  public DeltaFormatPlugin(
+    String name,
+    DrillbitContext context,
+    Configuration fsConf,
+    FileSystemConfig storageConfig,
+    DeltaFormatPluginConfig config) {
+    this.storageConfig = storageConfig;
+    this.config = config;
+    this.fsConf = fsConf;
+    this.context = context;
+    this.name = name;
+    this.matcher = new DeltaFormatMatcher(this);
+    this.storagePluginRulesSupplier = storagePluginRulesSupplier(name + NEXT_ID.getAndIncrement());
+  }
+
+  private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name) {
+    Convention convention = new Convention.Impl(DELTA_CONVENTION_PREFIX + name, PluginRel.class);
+    return StoragePluginRulesSupplier.builder()
+      .rulesProvider(new PluginRulesProviderImpl(convention, DeltaPluginImplementor::new))
+      .supportsFilterPushdown(true)
+      .supportsProjectPushdown(true)
+      .supportsLimitPushdown(true)
+      .convention(convention)
+      .build();
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return false;
+  }
+
+  @Override
+  public boolean supportsAutoPartitioning() {
+    return false;
+  }
+
+  @Override
+  public FormatMatcher getMatcher() {
+    return matcher;
+  }
+
+  @Override
+  public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) {
+    switch (phase) {
+      case PHYSICAL:
+      case LOGICAL:
+        return storagePluginRulesSupplier.getOptimizerRules();
+      case LOGICAL_PRUNE_AND_JOIN:
+      case LOGICAL_PRUNE:
+      case PARTITION_PRUNING:
+      case JOIN_PLANNING:
+      default:
+        return Collections.emptySet();
+    }
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException {
+    return DeltaGroupScan.builder()
+      .userName(userName)
+      .formatPlugin(this)
+      .readerConfig(ParquetReaderConfig.builder().withConf(fsConf).build())
+      .path(selection.selectionRoot.toUri().getPath())
+      .columns(columns)
+      .limit(-1)
+      .build();
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+    List<SchemaPath> columns, MetadataProviderManager metadataProviderManager) throws IOException {
+    SchemaProvider schemaProvider = metadataProviderManager.getSchemaProvider();
+    TupleMetadata schema = schemaProvider != null
+      ? schemaProvider.read().getSchema()
+      : null;
+    return DeltaGroupScan.builder()
+      .userName(userName)
+      .formatPlugin(this)
+      .readerConfig(ParquetReaderConfig.builder().withConf(fsConf).build())
+      .schema(schema)
+      .path(selection.selectionRoot.toUri().getPath())
+      .columns(columns)
+      .limit(-1)
+      .build();
+  }
+
+  @Override
+  public boolean supportsStatistics() {
+    return false;
+  }
+
+  @Override
+  public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
+    throw new UnsupportedOperationException("unimplemented");

Review Comment:
   It uses stats files that Drill creates (currently, only parquet format is supported), so for the case of the Delta plugin, I'm not sure whether it is a good idea to create extra files there.



##########
contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/read/DeltaScanBatchCreator.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.read;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.delta.DeltaRowGroupScan;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.AbstractParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.AbstractParquetScanBatchCreator;
+import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("unused")
+public class DeltaScanBatchCreator extends AbstractParquetScanBatchCreator

Review Comment:
   Yes, this and other options are applicable to the delta plugin.



##########
contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatMatcher.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import io.delta.standalone.DeltaLog;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
+import org.apache.hadoop.fs.FileStatus;
+
+public class DeltaFormatMatcher extends FormatMatcher {
+
+  private final DeltaFormatPlugin formatPlugin;
+
+  public DeltaFormatMatcher(DeltaFormatPlugin formatPlugin) {
+    this.formatPlugin = formatPlugin;
+  }
+
+  @Override
+  public boolean supportDirectoryReads() {

Review Comment:
   No, it was before. It means that the Drill plugin is able to handle queries where the directory is specified, not the file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] DRILL-8353: Format plugin for Delta Lake (drill)

Posted by "kmatt (via GitHub)" <gi...@apache.org>.
kmatt commented on PR #2702:
URL: https://github.com/apache/drill/pull/2702#issuecomment-1599386040

   @vvysotskyi https://issues.apache.org/jira/browse/DRILL-8442
   
   Should this be a GitHub issue, or is Jira the correct place for it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [drill] jnturton commented on a diff in pull request #2702: DRILL-8353: Format plugin for Delta Lake

Posted by GitBox <gi...@apache.org>.
jnturton commented on code in PR #2702:
URL: https://github.com/apache/drill/pull/2702#discussion_r1017953161


##########
contrib/format-deltalake/README.md:
##########
@@ -0,0 +1,36 @@
+# Delta Lake format plugin
+
+This format plugin enabled Drill to query Delta Lake tables.

Review Comment:
   ```suggestion
   This format plugin enables Drill to query Delta Lake tables.
   ```



##########
contrib/format-deltalake/README.md:
##########
@@ -0,0 +1,36 @@
+# Delta Lake format plugin
+
+This format plugin enabled Drill to query Delta Lake tables.
+
+## Supported optimizations and features
+
+### Project pushdown
+
+This format plugin supports project and filter pushdown optimizations.
+
+For the case of project pushdown, only columns specified in the query will be read, even they are nested columns.

Review Comment:
   ```suggestion
   For the case of project pushdown, only columns specified in the query will be read, even when they are nested columns.
   ```



##########
contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPluginConfig;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
+import static org.junit.Assert.assertEquals;
+
+public class DeltaQueriesTest extends ClusterTest {
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
+    FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig();
+    Map<String, FormatPluginConfig> formats = new HashMap<>(pluginConfig.getFormats());
+    formats.put("delta", new DeltaFormatPluginConfig());
+    FileSystemConfig newPluginConfig = new FileSystemConfig(
+      pluginConfig.getConnection(),
+      pluginConfig.getConfig(),
+      pluginConfig.getWorkspaces(),
+      formats,
+      PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+    newPluginConfig.setEnabled(pluginConfig.isEnabled());
+    pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
+
+    dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-primitives"));
+    dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-partition-values"));
+    dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-nested-struct"));
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String plan = queryBuilder().sql("select * from dfs.`data-reader-partition-values`").explainJson();
+    long count = queryBuilder().physical(plan).run().recordCount();
+    assertEquals(3, count);
+  }
+
+  @Test
+  public void testAllPrimitives() throws Exception {
+    testBuilder()
+      .sqlQuery("select * from dfs.`data-reader-primitives`")
+      .ordered()
+      .baselineColumns("as_int", "as_long", "as_byte", "as_short", "as_boolean", "as_float",
+        "as_double", "as_string", "as_binary", "as_big_decimal")
+      .baselineValues(null, null, null, null, null, null, null, null, null, null)
+      .baselineValues(0, 0L, 0, 0, true, 0.0f, 0.0, "0", new byte[]{0, 0}, BigDecimal.valueOf(0))
+      .baselineValues(1, 1L, 1, 1, false, 1.0f, 1.0, "1", new byte[]{1, 1}, BigDecimal.valueOf(1))
+      .baselineValues(2, 2L, 2, 2, true, 2.0f, 2.0, "2", new byte[]{2, 2}, BigDecimal.valueOf(2))
+      .baselineValues(3, 3L, 3, 3, false, 3.0f, 3.0, "3", new byte[]{3, 3}, BigDecimal.valueOf(3))
+      .baselineValues(4, 4L, 4, 4, true, 4.0f, 4.0, "4", new byte[]{4, 4}, BigDecimal.valueOf(4))
+      .baselineValues(5, 5L, 5, 5, false, 5.0f, 5.0, "5", new byte[]{5, 5}, BigDecimal.valueOf(5))
+      .baselineValues(6, 6L, 6, 6, true, 6.0f, 6.0, "6", new byte[]{6, 6}, BigDecimal.valueOf(6))
+      .baselineValues(7, 7L, 7, 7, false, 7.0f, 7.0, "7", new byte[]{7, 7}, BigDecimal.valueOf(7))
+      .baselineValues(8, 8L, 8, 8, true, 8.0f, 8.0, "8", new byte[]{8, 8}, BigDecimal.valueOf(8))
+      .baselineValues(9, 9L, 9, 9, false, 9.0f, 9.0, "9", new byte[]{9, 9}, BigDecimal.valueOf(9))
+      .go();
+  }
+
+  @Test
+  public void testProjectingColumns() throws Exception {
+
+    String query = "select as_int, as_string from dfs.`data-reader-primitives`";
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include("columns=\\[`as_int`, `as_string`\\]")
+      .match();
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("as_int", "as_string")
+      .baselineValues(null, null)
+      .baselineValues(0, "0")
+      .baselineValues(1, "1")
+      .baselineValues(2, "2")
+      .baselineValues(3, "3")
+      .baselineValues(4, "4")
+      .baselineValues(5, "5")
+      .baselineValues(6, "6")
+      .baselineValues(7, "7")
+      .baselineValues(8, "8")
+      .baselineValues(9, "9")
+      .go();
+  }
+
+  @Test
+  public void testProjectNestedColumn() throws Exception {
+    String query = "select t.a.ac.acb as acb, b from dfs.`data-reader-nested-struct` t";
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include("columns=\\[`a`.`ac`.`acb`, `b`\\]")
+      .match();
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("acb", "b")
+      .baselineValues(0L, 0)
+      .baselineValues(1L, 1)
+      .baselineValues(2L, 2)
+      .baselineValues(3L, 3)
+      .baselineValues(4L, 4)
+      .baselineValues(5L, 5)
+      .baselineValues(6L, 6)
+      .baselineValues(7L, 7)
+      .baselineValues(8L, 8)
+      .baselineValues(9L, 9)
+      .go();
+  }
+
+  @Test
+  public void testPartitionPruning() throws Exception {
+    String query = "select as_int, as_string from dfs.`data-reader-partition-values` where as_long = 1";
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include("numFiles\\=1")
+      .match();
+
+    testBuilder()
+      .sqlQuery(query)
+      .ordered()
+      .baselineColumns("as_int", "as_string")
+      .baselineValues("1", "1")
+      .go();
+  }
+
+  @Test
+  public void testEmptyResults() throws Exception {
+    String query = "select as_int, as_string from dfs.`data-reader-partition-values` where as_long = 101";
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include("numFiles\\=1")
+      .match();
+
+    testBuilder()
+      .sqlQuery(query)
+      .ordered()
+      .expectsEmptyResultSet()
+      .go();
+  }
+
+  @Test
+  public void testLimit() throws Exception {
+    String query = "select as_int, as_string from dfs.`data-reader-partition-values` limit 1";
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include("Limit\\(fetch\\=\\[1\\]\\)")

Review Comment:
   ```suggestion
         // Note that both of the following two limits are expected because this format plugin supports an "artificial" limit.
         .include("Limit\\(fetch\\=\\[1\\]\\)")
   ```



##########
contrib/format-deltalake/README.md:
##########
@@ -0,0 +1,36 @@
+# Delta Lake format plugin
+
+This format plugin enabled Drill to query Delta Lake tables.
+
+## Supported optimizations and features
+
+### Project pushdown
+
+This format plugin supports project and filter pushdown optimizations.
+
+For the case of project pushdown, only columns specified in the query will be read, even they are nested columns.
+
+### Filter pushdown
+
+For the case of filter pushdown, all expressions supported by Delta Lake API will be pushed down, so only data that
+matches the filter expression will be read. Additionally, filtering logic for parquet files is enabled
+to allow pruning of parquet files that do not match the filter expression.
+
+## Configuration
+
+Format plugin has the following configuration options:

Review Comment:
   ```suggestion
   The format plugin has the following configuration options:
   ```



##########
contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/read/DeltaScanBatchCreator.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.read;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.delta.DeltaRowGroupScan;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.AbstractParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.AbstractParquetScanBatchCreator;
+import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("unused")
+public class DeltaScanBatchCreator extends AbstractParquetScanBatchCreator

Review Comment:
   So because you've inherited from AbstractParquetScanBatchCreator here, the various options that control the Parquet reader like `store.parquet.use_new_reader` will be applied for Delta Lake tables?



##########
contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPlugin.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.delta.DeltaGroupScan;
+import org.apache.drill.exec.store.delta.plan.DeltaPluginImplementor;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DeltaFormatPlugin implements FormatPlugin {
+
+  private static final String DELTA_CONVENTION_PREFIX = "DELTA.";
+
+  /**
+   * Generator for format id values. Formats with the same name may be defined
+   * in multiple storage plugins, so using the unique id within the convention name
+   * to ensure the rule names will be unique for different plugin instances.
+   */
+  private static final AtomicInteger NEXT_ID = new AtomicInteger(0);
+
+  private final FileSystemConfig storageConfig;
+
+  private final DeltaFormatPluginConfig config;
+
+  private final Configuration fsConf;
+
+  private final DrillbitContext context;
+
+  private final String name;
+
+  private final DeltaFormatMatcher matcher;
+
+  private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+
+  public DeltaFormatPlugin(
+    String name,
+    DrillbitContext context,
+    Configuration fsConf,
+    FileSystemConfig storageConfig,
+    DeltaFormatPluginConfig config) {
+    this.storageConfig = storageConfig;
+    this.config = config;
+    this.fsConf = fsConf;
+    this.context = context;
+    this.name = name;
+    this.matcher = new DeltaFormatMatcher(this);
+    this.storagePluginRulesSupplier = storagePluginRulesSupplier(name + NEXT_ID.getAndIncrement());
+  }
+
+  private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name) {
+    Convention convention = new Convention.Impl(DELTA_CONVENTION_PREFIX + name, PluginRel.class);
+    return StoragePluginRulesSupplier.builder()
+      .rulesProvider(new PluginRulesProviderImpl(convention, DeltaPluginImplementor::new))
+      .supportsFilterPushdown(true)
+      .supportsProjectPushdown(true)
+      .supportsLimitPushdown(true)
+      .convention(convention)
+      .build();
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return false;
+  }
+
+  @Override
+  public boolean supportsAutoPartitioning() {
+    return false;
+  }
+
+  @Override
+  public FormatMatcher getMatcher() {
+    return matcher;
+  }
+
+  @Override
+  public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) {
+    switch (phase) {
+      case PHYSICAL:
+      case LOGICAL:
+        return storagePluginRulesSupplier.getOptimizerRules();
+      case LOGICAL_PRUNE_AND_JOIN:
+      case LOGICAL_PRUNE:
+      case PARTITION_PRUNING:
+      case JOIN_PLANNING:
+      default:
+        return Collections.emptySet();
+    }
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException {
+    return DeltaGroupScan.builder()
+      .userName(userName)
+      .formatPlugin(this)
+      .readerConfig(ParquetReaderConfig.builder().withConf(fsConf).build())
+      .path(selection.selectionRoot.toUri().getPath())
+      .columns(columns)
+      .limit(-1)
+      .build();
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+    List<SchemaPath> columns, MetadataProviderManager metadataProviderManager) throws IOException {
+    SchemaProvider schemaProvider = metadataProviderManager.getSchemaProvider();
+    TupleMetadata schema = schemaProvider != null
+      ? schemaProvider.read().getSchema()
+      : null;
+    return DeltaGroupScan.builder()
+      .userName(userName)
+      .formatPlugin(this)
+      .readerConfig(ParquetReaderConfig.builder().withConf(fsConf).build())
+      .schema(schema)
+      .path(selection.selectionRoot.toUri().getPath())
+      .columns(columns)
+      .limit(-1)
+      .build();
+  }
+
+  @Override
+  public boolean supportsStatistics() {
+    return false;
+  }
+
+  @Override
+  public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
+    throw new UnsupportedOperationException("unimplemented");

Review Comment:
   Could this in principle be implemented for Delta Lake?



##########
contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DrillExprToDeltaTranslator.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.plan;
+
+import io.delta.standalone.expressions.And;
+import io.delta.standalone.expressions.EqualTo;
+import io.delta.standalone.expressions.Expression;
+import io.delta.standalone.expressions.GreaterThan;
+import io.delta.standalone.expressions.GreaterThanOrEqual;
+import io.delta.standalone.expressions.IsNotNull;
+import io.delta.standalone.expressions.IsNull;
+import io.delta.standalone.expressions.LessThan;
+import io.delta.standalone.expressions.LessThanOrEqual;
+import io.delta.standalone.expressions.Literal;
+import io.delta.standalone.expressions.Not;
+import io.delta.standalone.expressions.Or;
+import io.delta.standalone.expressions.Predicate;
+import io.delta.standalone.types.StructType;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+public class DrillExprToDeltaTranslator extends AbstractExprVisitor<Expression, Void, RuntimeException> {
+
+  private final StructType structType;
+
+  public DrillExprToDeltaTranslator(StructType structType) {
+    this.structType = structType;
+  }
+
+  @Override
+  public Expression visitFunctionCall(FunctionCall call, Void value) {
+    try {
+      return visitFunctionCall(call);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  private Predicate visitFunctionCall(FunctionCall call) {
+    switch (call.getName()) {
+      case FunctionNames.AND: {
+        Expression left = call.arg(0).accept(this, null);
+        Expression right = call.arg(1).accept(this, null);
+        if (left != null && right != null) {
+          return new And(left, right);
+        }
+        return null;
+      }
+      case FunctionNames.OR: {
+        Expression left = call.arg(0).accept(this, null);
+        Expression right = call.arg(1).accept(this, null);
+        if (left != null && right != null) {
+          return new Or(left, right);
+        }
+        return null;
+      }
+      case FunctionNames.NOT: {
+        Expression expression = call.arg(0).accept(this, null);
+        if (expression != null) {
+          return new Not(expression);
+        }
+        return null;
+      }
+      case FunctionNames.IS_NULL: {
+        LogicalExpression arg = call.arg(0);
+        if (arg instanceof SchemaPath) {
+          String name = getPath((SchemaPath) arg);
+          return new IsNull(structType.column(name));
+        }
+        return null;
+      }
+      case FunctionNames.IS_NOT_NULL: {
+        LogicalExpression arg = call.arg(0);
+        if (arg instanceof SchemaPath) {
+          String name = getPath((SchemaPath) arg);
+          return new IsNotNull(structType.column(name));
+        }
+        return null;
+      }
+      case FunctionNames.LT: {
+        LogicalExpression nameRef = call.arg(0);
+        Expression expression = call.arg(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new LessThan(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.LE: {
+        LogicalExpression nameRef = call.arg(0);
+        Expression expression = call.arg(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new LessThanOrEqual(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.GT: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new GreaterThan(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.GE: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(0).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new GreaterThanOrEqual(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.EQ: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new EqualTo(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.NE: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new Not(new EqualTo(structType.column(name), expression));
+        }
+        return null;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Expression visitFloatConstant(ValueExpressions.FloatExpression fExpr, Void value) {
+    return Literal.of(fExpr.getFloat());
+  }
+
+  @Override
+  public Expression visitIntConstant(ValueExpressions.IntExpression intExpr, Void value) {
+    return Literal.of(intExpr.getInt());
+  }
+
+  @Override
+  public Expression visitLongConstant(ValueExpressions.LongExpression longExpr, Void value) {
+    return Literal.of(longExpr.getLong());
+  }
+
+  @Override
+  public Expression visitDecimal9Constant(ValueExpressions.Decimal9Expression decExpr, Void value) {
+    return Literal.of(decExpr.getIntFromDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal18Constant(ValueExpressions.Decimal18Expression decExpr, Void value) {
+    return Literal.of(decExpr.getLongFromDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal28Constant(ValueExpressions.Decimal28Expression decExpr, Void value) {
+    return Literal.of(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal38Constant(ValueExpressions.Decimal38Expression decExpr, Void value) {
+    return Literal.of(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitVarDecimalConstant(ValueExpressions.VarDecimalExpression decExpr, Void value) {
+    return Literal.of(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitDateConstant(ValueExpressions.DateExpression dateExpr, Void value) {
+    return Literal.of(dateExpr.getDate());
+  }
+
+  @Override
+  public Expression visitTimeConstant(ValueExpressions.TimeExpression timeExpr, Void value) {
+    return Literal.of(timeExpr.getTime());
+  }
+
+  @Override
+  public Expression visitTimeStampConstant(ValueExpressions.TimeStampExpression timestampExpr, Void value) {
+    return Literal.of(timestampExpr.getTimeStamp());
+  }
+
+  @Override
+  public Expression visitDoubleConstant(ValueExpressions.DoubleExpression dExpr, Void value) {
+    return Literal.of(dExpr.getDouble());
+  }
+
+  @Override
+  public Expression visitBooleanConstant(ValueExpressions.BooleanExpression e, Void value) {
+    return Literal.of(e.getBoolean());
+  }
+
+  @Override
+  public Expression visitQuotedStringConstant(ValueExpressions.QuotedString e, Void value) {
+    return Literal.of(e.getString());
+  }
+
+  @Override
+  public Expression visitUnknown(LogicalExpression e, Void value) {
+    return null;
+  }
+
+  private static String getPath(SchemaPath schemaPath) {

Review Comment:
   Is any of this generic enough to go to a schema utils class, or is the "element" keyword specific to Delta Lake?



##########
contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatMatcher.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import io.delta.standalone.DeltaLog;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
+import org.apache.hadoop.fs.FileStatus;
+
+public class DeltaFormatMatcher extends FormatMatcher {
+
+  private final DeltaFormatPlugin formatPlugin;
+
+  public DeltaFormatMatcher(DeltaFormatPlugin formatPlugin) {
+    this.formatPlugin = formatPlugin;
+  }
+
+  @Override
+  public boolean supportDirectoryReads() {

Review Comment:
   Is this something that was introduced by the Iceberg format plugin?



##########
exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java:
##########
@@ -93,4 +93,12 @@ default void visitChild(RelNode input) throws IOException {
    * to ensure returning the correct rows number.
    */
   boolean artificialLimit();
+
+  /**
+   * If the plugin doesn't support native filter pushdown,
+   * but the reader can prune the number of rows to read.
+   * In this case filter operator on top of the scan should be preserved
+   * to ensure returning the correct rows number.

Review Comment:
   ```suggestion
      * but the reader can prune the set of rows to read.
      * In this case filter operator on top of the scan should be preserved
      * to ensure returning the correct subset of rows.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] DRILL-8353: Format plugin for Delta Lake (drill)

Posted by "cgivre (via GitHub)" <gi...@apache.org>.
cgivre commented on PR #2702:
URL: https://github.com/apache/drill/pull/2702#issuecomment-1599387786

   @kmatt A github issue is good!  Please be sure to tag @vvysotskyi in it as he was the original developer of this plugin.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [drill] vvysotskyi commented on pull request #2702: DRILL-8353: Format plugin for Delta Lake

Posted by GitBox <gi...@apache.org>.
vvysotskyi commented on PR #2702:
URL: https://github.com/apache/drill/pull/2702#issuecomment-1334839612

   Hi @kmatt, no, it is not supported yet, but will be added in the near future. The version will be specified using the table function. Here is the example query for it:
   ```sql
   SELECT *
   FROM table(dfs.delta.`/tmp/delta-table`(type => 'delta', version => 0));
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org