You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2015/04/01 03:32:18 UTC

drill git commit: DRILL-2458: Ensure no unwanted columns are added to CTAS output when * column is used in the query.

Repository: drill
Updated Branches:
  refs/heads/master 1b0c52f91 -> 7b11e3e8f


DRILL-2458:  Ensure no unwanted columns are added to CTAS output when * column is used in the query.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7b11e3e8
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7b11e3e8
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7b11e3e8

Branch: refs/heads/master
Commit: 7b11e3e8fd5f0263e2165df413b36d48fdbcc0e1
Parents: 1b0c52f
Author: Jinfeng Ni <jn...@maprtech.com>
Authored: Thu Mar 19 13:21:41 2015 -0700
Committer: Jinfeng Ni <jn...@maprtech.com>
Committed: Tue Mar 31 18:03:05 2015 -0700

----------------------------------------------------------------------
 .../physical/visitor/StarColumnConverter.java   | 140 ++++++++++---------
 .../physical/impl/writer/TestParquetWriter.java |  21 +++
 2 files changed, 98 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7b11e3e8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
index edf1a2e..9f4924e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
@@ -18,6 +18,8 @@
 
 package org.apache.drill.exec.planner.physical.visitor;
 
+import java.beans.Statement;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
@@ -42,95 +44,107 @@ import org.eigenbase.util.Pair;
 
 import com.google.common.collect.Lists;
 
-public class StarColumnConverter extends BasePrelVisitor<Prel, boolean[], RuntimeException>{
-
-  private static StarColumnConverter INSTANCE = new StarColumnConverter();
+public class StarColumnConverter extends BasePrelVisitor<Prel, Void, RuntimeException>{
 
   private static final AtomicLong tableNumber = new AtomicLong(0);
 
-  public static Prel insertRenameProject(Prel root) {
-    // Prefixing columns for columns expanded from star column :
-    // Insert one project under screen (PUS) to remove prefix, and one project above scan (PAS) to add prefix.
-    // PUS AND PAS are required, when
-    //   Any non-SCAN prel produces regular column / expression AND star column,
-    //   or multiple star columns.
-    //   This is because we have to use prefix to distinguish columns expanded
-    //   from star column, from those regular column referenced in the query.
-
-    // We use an array of boolean to keep track this condition.
-    boolean [] prefixedForStar = new boolean [1];
-    prefixedForStar[0] = false;
-
-    return root.accept(INSTANCE, prefixedForStar);
+  private boolean prefixedForStar = false;
+  private boolean prefixedForWriter = false;
+
+  private StarColumnConverter() {
+    prefixedForStar = false;
+    prefixedForWriter = false;
   }
 
-  @Override
-  public Prel visitScreen(ScreenPrel prel, boolean[] prefixedForStar) throws RuntimeException {
-    return insertProjUnderScreen(prel, prefixedForStar, prel.getChild().getRowType());
+  public static Prel insertRenameProject(Prel root) {
+    // Prefixing columns for columns expanded from star column. There are two cases.
+    // 1. star is used in SELECT only.
+    //   - Insert project under screen (PUS) to remove prefix,
+    //   - Insert project above scan (PAS) to add prefix.
+    // 2. star is used in CTAS
+    //   - Insert project under Writer (PUW) to remove prefix
+    //   - Insert project above scan (PAS) to add prefix.
+    //   - DO NOT insert any project for prefix handling under Screen.
+
+    // The following condtions should apply when prefix is required
+    //   Any non-SCAN prel produces regular column / expression AND star column, multiple star columns.
+    // This is because we have to use prefix to distinguish columns expanded
+    // from star column, from those regular column referenced in the query.
+
+    return root.accept(new StarColumnConverter(), null);
   }
 
   @Override
-  public Prel visitWriter(WriterPrel prel, boolean[] prefixedForStar) throws RuntimeException {
-    Prel newPrel = insertProjUnderScreen(prel, prefixedForStar, prel.getChild().getRowType());
-
-    prefixedForStar[0] = false;
+  public Prel visitScreen(ScreenPrel prel, Void value) throws RuntimeException {
+    Prel child = ((Prel) prel.getInput(0)).accept(this, null);
 
-    return newPrel;
+    if (prefixedForStar) {
+      if (!prefixedForWriter) {
+        // Prefix is added for SELECT only, not for CTAS writer.
+        return insertProjUnderScreenOrWriter(prel, prel.getChild().getRowType(), child);
+      } else {
+        // Prefix is added under CTAS Writer. We need create a new Screen with the converted child.
+        return (Prel) prel.copy(prel.getTraitSet(), Collections.<RelNode>singletonList(child));
+      }
+    } else {
+      // No prefix is
+      return prel;
+    }
   }
 
-  // insert PUS: Project Under Screen, when necessary.
-  private Prel insertProjUnderScreen(Prel prel, boolean[] prefixedForStar, RelDataType origRowType) {
+  @Override
+  public Prel visitWriter(WriterPrel prel, Void value) throws RuntimeException {
+    Prel child = ((Prel) prel.getInput(0)).accept(this, null);
+    if (prefixedForStar) {
+      prefixedForWriter = true;
+      return insertProjUnderScreenOrWriter(prel, prel.getChild().getRowType(), child);
+    } else {
+      return prel;
+    }
+  }
 
-    Prel child = ((Prel) prel.getInput(0)).accept(INSTANCE, prefixedForStar);
+  // insert PUS or PUW: Project Under Screen/Writer, when necessary.
+  private Prel insertProjUnderScreenOrWriter(Prel prel, RelDataType origRowType, Prel child) {
 
     ProjectPrel proj = null;
+    List<RelNode> children = Lists.newArrayList();
 
-    if (prefixedForStar[0]) {
-      List<RexNode> exprs = Lists.newArrayList();
-      for (int i = 0; i < origRowType.getFieldCount(); i++) {
-        RexNode expr = child.getCluster().getRexBuilder().makeInputRef(origRowType.getFieldList().get(i).getType(), i);
-        exprs.add(expr);
-      }
-
-      RelDataType newRowType = RexUtil.createStructType(child.getCluster().getTypeFactory(), exprs, origRowType.getFieldNames());
+    List<RexNode> exprs = Lists.newArrayList();
+    for (int i = 0; i < origRowType.getFieldCount(); i++) {
+      RexNode expr = child.getCluster().getRexBuilder().makeInputRef(origRowType.getFieldList().get(i).getType(), i);
+      exprs.add(expr);
+    }
 
-      int fieldCount = prel.getRowType().isStruct()? prel.getRowType().getFieldCount():1;
+    RelDataType newRowType = RexUtil.createStructType(child.getCluster().getTypeFactory(), exprs, origRowType.getFieldNames());
 
-      // Insert PUS : remove the prefix and keep the original field name.
-      if (fieldCount > 1) { // // no point in allowing duplicates if we only have one column
-        proj = new ProjectAllowDupPrel(child.getCluster(), child.getTraitSet(), child, exprs, newRowType);
-      } else {
-        proj = new ProjectPrel(child.getCluster(), child.getTraitSet(), child, exprs, newRowType);
-      }
+    int fieldCount = prel.getRowType().isStruct()? prel.getRowType().getFieldCount():1;
 
-      List<RelNode> children = Lists.newArrayList();
-
-      children.add(proj);
-      return (Prel) prel.copy(prel.getTraitSet(), children);
+    // Insert PUS/PUW : remove the prefix and keep the original field name.
+    if (fieldCount > 1) { // // no point in allowing duplicates if we only have one column
+      proj = new ProjectAllowDupPrel(child.getCluster(), child.getTraitSet(), child, exprs, newRowType);
     } else {
-      return prel;
+      proj = new ProjectPrel(child.getCluster(), child.getTraitSet(), child, exprs, newRowType);
     }
 
+    children.add(proj);
+    return (Prel) prel.copy(prel.getTraitSet(), children);
   }
-
-  @Override
-  public Prel visitProject(ProjectPrel prel, boolean[] prefixedForStar) throws RuntimeException {
+      @Override
+  public Prel visitProject(ProjectPrel prel, Void value) throws RuntimeException {
     ProjectPrel proj = (ProjectPrel) prel;
 
     // Require prefix rename : there exists other expression, in addition to a star column.
-    if (!prefixedForStar[0]  // not set yet.
+    if (!prefixedForStar  // not set yet.
         && StarColumnHelper.containsStarColumnInProject(prel.getChild().getRowType(), proj.getProjects())
         && prel.getRowType().getFieldNames().size() > 1) {
-      prefixedForStar[0] = true;
+      prefixedForStar = true;
     }
 
     // For project, we need make sure that the project's field name is same as the input,
     // when the project expression is RexInPutRef, since we may insert a PAS which will
     // rename the projected fields.
 
-
-
-    RelNode child = ((Prel) prel.getInput(0)).accept(INSTANCE, prefixedForStar);
+    RelNode child = ((Prel) prel.getInput(0)).accept(this, null);
 
     List<String> fieldNames = Lists.newArrayList();
 
@@ -158,17 +172,17 @@ public class StarColumnConverter extends BasePrelVisitor<Prel, boolean[], Runtim
   }
 
   @Override
-  public Prel visitPrel(Prel prel, boolean [] prefixedForStar) throws RuntimeException {
+  public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
     // Require prefix rename : there exists other expression, in addition to a star column.
-    if (!prefixedForStar[0]  // not set yet.
+    if (!prefixedForStar  // not set yet.
         && StarColumnHelper.containsStarColumn(prel.getRowType())
         && prel.getRowType().getFieldNames().size() > 1) {
-      prefixedForStar[0] = true;
+      prefixedForStar = true;
     }
 
     List<RelNode> children = Lists.newArrayList();
     for (Prel child : prel) {
-      child = child.accept(this, prefixedForStar);
+      child = child.accept(this, null);
       children.add(child);
     }
 
@@ -176,8 +190,8 @@ public class StarColumnConverter extends BasePrelVisitor<Prel, boolean[], Runtim
   }
 
   @Override
-  public Prel visitScan(ScanPrel scanPrel, boolean [] prefixedForStar) throws RuntimeException {
-    if (StarColumnHelper.containsStarColumn(scanPrel.getRowType()) && prefixedForStar[0] ) {
+  public Prel visitScan(ScanPrel scanPrel, Void value) throws RuntimeException {
+    if (StarColumnHelper.containsStarColumn(scanPrel.getRowType()) && prefixedForStar ) {
 
       List<RexNode> exprs = Lists.newArrayList();
 
@@ -204,7 +218,7 @@ public class StarColumnConverter extends BasePrelVisitor<Prel, boolean[], Runtim
 
       return proj;
     } else {
-      return visitPrel(scanPrel, prefixedForStar);
+      return visitPrel(scanPrel, value);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7b11e3e8/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index fbfb996..669a21f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -247,6 +247,27 @@ public class TestParquetWriter extends BaseTestQuery {
     runTestAndValidate(selection, validateSelection, inputTable, "region_star_exp");
   }
 
+  @Test // DRILL-2458
+  public void testWriterWithStarAndRegluarCol() throws Exception {
+    String outputFile = "region_sort";
+    String ctasStmt = "create table " + outputFile + " as select *, r_regionkey + 1 as key1 from cp.`tpch/region.parquet` order by r_name";
+    String query = "select r_regionkey, r_name, r_comment, r_regionkey +1 as key1 from cp.`tpch/region.parquet` order by r_name";
+    String queryFromWriteOut = "select * from " + outputFile;
+
+    Path path = new Path("/tmp/" + outputFile);
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+
+    test("use dfs.tmp");
+    test(ctasStmt);
+    testBuilder()
+        .ordered()
+        .sqlQuery(queryFromWriteOut)
+        .sqlBaselineQuery(query)
+        .build().run();
+  }
+
   public void compareParquetReadersColumnar(String selection, String table) throws Exception {
     String query = "select " + selection + " from " + table;