You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2022/11/19 09:54:56 UTC

[GitHub] [drill] vvysotskyi commented on a diff in pull request #2702: DRILL-8353: Format plugin for Delta Lake

vvysotskyi commented on code in PR #2702:
URL: https://github.com/apache/drill/pull/2702#discussion_r1027065550


##########
contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DrillExprToDeltaTranslator.java:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.plan;
+
+import io.delta.standalone.expressions.And;
+import io.delta.standalone.expressions.EqualTo;
+import io.delta.standalone.expressions.Expression;
+import io.delta.standalone.expressions.GreaterThan;
+import io.delta.standalone.expressions.GreaterThanOrEqual;
+import io.delta.standalone.expressions.IsNotNull;
+import io.delta.standalone.expressions.IsNull;
+import io.delta.standalone.expressions.LessThan;
+import io.delta.standalone.expressions.LessThanOrEqual;
+import io.delta.standalone.expressions.Literal;
+import io.delta.standalone.expressions.Not;
+import io.delta.standalone.expressions.Or;
+import io.delta.standalone.expressions.Predicate;
+import io.delta.standalone.types.StructType;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+public class DrillExprToDeltaTranslator extends AbstractExprVisitor<Expression, Void, RuntimeException> {
+
+  private final StructType structType;
+
+  public DrillExprToDeltaTranslator(StructType structType) {
+    this.structType = structType;
+  }
+
+  @Override
+  public Expression visitFunctionCall(FunctionCall call, Void value) {
+    try {
+      return visitFunctionCall(call);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  private Predicate visitFunctionCall(FunctionCall call) {
+    switch (call.getName()) {
+      case FunctionNames.AND: {
+        Expression left = call.arg(0).accept(this, null);
+        Expression right = call.arg(1).accept(this, null);
+        if (left != null && right != null) {
+          return new And(left, right);
+        }
+        return null;
+      }
+      case FunctionNames.OR: {
+        Expression left = call.arg(0).accept(this, null);
+        Expression right = call.arg(1).accept(this, null);
+        if (left != null && right != null) {
+          return new Or(left, right);
+        }
+        return null;
+      }
+      case FunctionNames.NOT: {
+        Expression expression = call.arg(0).accept(this, null);
+        if (expression != null) {
+          return new Not(expression);
+        }
+        return null;
+      }
+      case FunctionNames.IS_NULL: {
+        LogicalExpression arg = call.arg(0);
+        if (arg instanceof SchemaPath) {
+          String name = getPath((SchemaPath) arg);
+          return new IsNull(structType.column(name));
+        }
+        return null;
+      }
+      case FunctionNames.IS_NOT_NULL: {
+        LogicalExpression arg = call.arg(0);
+        if (arg instanceof SchemaPath) {
+          String name = getPath((SchemaPath) arg);
+          return new IsNotNull(structType.column(name));
+        }
+        return null;
+      }
+      case FunctionNames.LT: {
+        LogicalExpression nameRef = call.arg(0);
+        Expression expression = call.arg(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new LessThan(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.LE: {
+        LogicalExpression nameRef = call.arg(0);
+        Expression expression = call.arg(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new LessThanOrEqual(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.GT: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new GreaterThan(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.GE: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(0).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new GreaterThanOrEqual(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.EQ: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new EqualTo(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.NE: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new Not(new EqualTo(structType.column(name), expression));
+        }
+        return null;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Expression visitFloatConstant(ValueExpressions.FloatExpression fExpr, Void value) {
+    return Literal.of(fExpr.getFloat());
+  }
+
+  @Override
+  public Expression visitIntConstant(ValueExpressions.IntExpression intExpr, Void value) {
+    return Literal.of(intExpr.getInt());
+  }
+
+  @Override
+  public Expression visitLongConstant(ValueExpressions.LongExpression longExpr, Void value) {
+    return Literal.of(longExpr.getLong());
+  }
+
+  @Override
+  public Expression visitDecimal9Constant(ValueExpressions.Decimal9Expression decExpr, Void value) {
+    return Literal.of(decExpr.getIntFromDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal18Constant(ValueExpressions.Decimal18Expression decExpr, Void value) {
+    return Literal.of(decExpr.getLongFromDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal28Constant(ValueExpressions.Decimal28Expression decExpr, Void value) {
+    return Literal.of(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal38Constant(ValueExpressions.Decimal38Expression decExpr, Void value) {
+    return Literal.of(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitVarDecimalConstant(ValueExpressions.VarDecimalExpression decExpr, Void value) {
+    return Literal.of(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitDateConstant(ValueExpressions.DateExpression dateExpr, Void value) {
+    return Literal.of(dateExpr.getDate());
+  }
+
+  @Override
+  public Expression visitTimeConstant(ValueExpressions.TimeExpression timeExpr, Void value) {
+    return Literal.of(timeExpr.getTime());
+  }
+
+  @Override
+  public Expression visitTimeStampConstant(ValueExpressions.TimeStampExpression timestampExpr, Void value) {
+    return Literal.of(timestampExpr.getTimeStamp());
+  }
+
+  @Override
+  public Expression visitDoubleConstant(ValueExpressions.DoubleExpression dExpr, Void value) {
+    return Literal.of(dExpr.getDouble());
+  }
+
+  @Override
+  public Expression visitBooleanConstant(ValueExpressions.BooleanExpression e, Void value) {
+    return Literal.of(e.getBoolean());
+  }
+
+  @Override
+  public Expression visitQuotedStringConstant(ValueExpressions.QuotedString e, Void value) {
+    return Literal.of(e.getString());
+  }
+
+  @Override
+  public Expression visitUnknown(LogicalExpression e, Void value) {
+    return null;
+  }
+
+  private static String getPath(SchemaPath schemaPath) {

Review Comment:
   Yes, I think `element` keyword usage is not common.



##########
contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPlugin.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.delta.DeltaGroupScan;
+import org.apache.drill.exec.store.delta.plan.DeltaPluginImplementor;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DeltaFormatPlugin implements FormatPlugin {
+
+  private static final String DELTA_CONVENTION_PREFIX = "DELTA.";
+
+  /**
+   * Generator for format id values. Formats with the same name may be defined
+   * in multiple storage plugins, so using the unique id within the convention name
+   * to ensure the rule names will be unique for different plugin instances.
+   */
+  private static final AtomicInteger NEXT_ID = new AtomicInteger(0);
+
+  private final FileSystemConfig storageConfig;
+
+  private final DeltaFormatPluginConfig config;
+
+  private final Configuration fsConf;
+
+  private final DrillbitContext context;
+
+  private final String name;
+
+  private final DeltaFormatMatcher matcher;
+
+  private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+
+  public DeltaFormatPlugin(
+    String name,
+    DrillbitContext context,
+    Configuration fsConf,
+    FileSystemConfig storageConfig,
+    DeltaFormatPluginConfig config) {
+    this.storageConfig = storageConfig;
+    this.config = config;
+    this.fsConf = fsConf;
+    this.context = context;
+    this.name = name;
+    this.matcher = new DeltaFormatMatcher(this);
+    this.storagePluginRulesSupplier = storagePluginRulesSupplier(name + NEXT_ID.getAndIncrement());
+  }
+
+  private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name) {
+    Convention convention = new Convention.Impl(DELTA_CONVENTION_PREFIX + name, PluginRel.class);
+    return StoragePluginRulesSupplier.builder()
+      .rulesProvider(new PluginRulesProviderImpl(convention, DeltaPluginImplementor::new))
+      .supportsFilterPushdown(true)
+      .supportsProjectPushdown(true)
+      .supportsLimitPushdown(true)
+      .convention(convention)
+      .build();
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return false;
+  }
+
+  @Override
+  public boolean supportsAutoPartitioning() {
+    return false;
+  }
+
+  @Override
+  public FormatMatcher getMatcher() {
+    return matcher;
+  }
+
+  @Override
+  public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) {
+    switch (phase) {
+      case PHYSICAL:
+      case LOGICAL:
+        return storagePluginRulesSupplier.getOptimizerRules();
+      case LOGICAL_PRUNE_AND_JOIN:
+      case LOGICAL_PRUNE:
+      case PARTITION_PRUNING:
+      case JOIN_PLANNING:
+      default:
+        return Collections.emptySet();
+    }
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException {
+    return DeltaGroupScan.builder()
+      .userName(userName)
+      .formatPlugin(this)
+      .readerConfig(ParquetReaderConfig.builder().withConf(fsConf).build())
+      .path(selection.selectionRoot.toUri().getPath())
+      .columns(columns)
+      .limit(-1)
+      .build();
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+    List<SchemaPath> columns, MetadataProviderManager metadataProviderManager) throws IOException {
+    SchemaProvider schemaProvider = metadataProviderManager.getSchemaProvider();
+    TupleMetadata schema = schemaProvider != null
+      ? schemaProvider.read().getSchema()
+      : null;
+    return DeltaGroupScan.builder()
+      .userName(userName)
+      .formatPlugin(this)
+      .readerConfig(ParquetReaderConfig.builder().withConf(fsConf).build())
+      .schema(schema)
+      .path(selection.selectionRoot.toUri().getPath())
+      .columns(columns)
+      .limit(-1)
+      .build();
+  }
+
+  @Override
+  public boolean supportsStatistics() {
+    return false;
+  }
+
+  @Override
+  public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
+    throw new UnsupportedOperationException("unimplemented");

Review Comment:
   It uses stats files that Drill creates (currently, only parquet format is supported), so for the case of the Delta plugin, I'm not sure whether it is a good idea to create extra files there.



##########
contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/read/DeltaScanBatchCreator.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.read;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.delta.DeltaRowGroupScan;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.AbstractParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.AbstractParquetScanBatchCreator;
+import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("unused")
+public class DeltaScanBatchCreator extends AbstractParquetScanBatchCreator

Review Comment:
   Yes, this and other options are applicable to the delta plugin.



##########
contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatMatcher.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import io.delta.standalone.DeltaLog;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
+import org.apache.hadoop.fs.FileStatus;
+
+public class DeltaFormatMatcher extends FormatMatcher {
+
+  private final DeltaFormatPlugin formatPlugin;
+
+  public DeltaFormatMatcher(DeltaFormatPlugin formatPlugin) {
+    this.formatPlugin = formatPlugin;
+  }
+
+  @Override
+  public boolean supportDirectoryReads() {

Review Comment:
   No, it was before. It means that the Drill plugin is able to handle queries where the directory is specified, not the file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org