You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/11/13 12:07:18 UTC

[04/11] drill git commit: DRILL-5822: The query with "SELECT *" with "ORDER BY" clause and `planner.slice_target`=1 doesn't preserve column order

DRILL-5822: The query with "SELECT *" with "ORDER BY" clause and `planner.slice_target`=1 doesn't preserve column order

- The commit for DRILL-847 is oudated. There is no need to canonicalize the batch or container since RecordBatchLoader
  swallows the "schema change" for now if two batches have different column ordering.

  closes #1017


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c1118a3d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c1118a3d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c1118a3d

Branch: refs/heads/master
Commit: c1118a3d9a74cf24f28bc69efca2c21d2a6d5b1d
Parents: 17ca618
Author: Vitalii Diravka <vi...@gmail.com>
Authored: Thu Oct 26 18:07:33 2017 +0000
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Mon Nov 13 11:05:29 2017 +0200

----------------------------------------------------------------------
 .../exec/physical/impl/TopN/TopNBatch.java      | 11 +++------
 .../impl/mergereceiver/MergingRecordBatch.java  |  7 +-----
 .../physical/impl/sort/RecordBatchData.java     |  8 +------
 .../impl/sort/SortRecordBatchBuilder.java       |  6 -----
 .../drill/exec/record/RecordBatchLoader.java    | 18 --------------
 .../drill/exec/record/VectorContainer.java      | 25 --------------------
 .../java/org/apache/drill/TestStarQueries.java  | 17 +++++++++++++
 7 files changed, 22 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c1118a3d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 950e1fe..dcf67d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -238,7 +238,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
           }
           boolean success = false;
           try {
-            batch.canonicalize();
             if (priorityQueue == null) {
               assert !schemaChanged;
               priorityQueue = createNewPriorityQueue(context, config.getOrderings(), new ExpandableHyperContainer(batch.getContainer()), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
@@ -323,7 +322,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       selectionVector4.clear();
       c.clear();
       VectorContainer newQueue = new VectorContainer();
-      builder.canonicalize();
       builder.build(context, newQueue);
       priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
       builder.getSv4().clear();
@@ -414,16 +412,13 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
       selectionVector4.clear();
       c.clear();
       final VectorContainer oldSchemaContainer = new VectorContainer(oContext);
-      builder.canonicalize();
       builder.build(context, oldSchemaContainer);
       oldSchemaContainer.setRecordCount(builder.getSv4().getCount());
       final VectorContainer newSchemaContainer =  SchemaUtil.coerceContainer(oldSchemaContainer, this.schema, oContext);
-      // Canonicalize new container since we canonicalize incoming batches before adding to queue.
-      final VectorContainer canonicalizedContainer = VectorContainer.canonicalize(newSchemaContainer);
-      canonicalizedContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
+      newSchemaContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
       priorityQueue.cleanup();
-      priorityQueue = createNewPriorityQueue(context, config.getOrderings(), canonicalizedContainer, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
-      priorityQueue.resetQueue(canonicalizedContainer, builder.getSv4().createNewWrapperCurrent());
+      priorityQueue = createNewPriorityQueue(context, config.getOrderings(), newSchemaContainer, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+      priorityQueue.resetQueue(newSchemaContainer, builder.getSv4().createNewWrapperCurrent());
     } finally {
       builder.clear();
       builder.close();

http://git-wip-us.apache.org/repos/asf/drill/blob/c1118a3d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index eff1ae9..ec945d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -325,12 +325,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
       // after this point all batches have been released and their bytebuf are in batchLoaders
 
-      // Canonicalize each incoming batch, so that vectors are alphabetically sorted based on SchemaPath.
-      for (final RecordBatchLoader loader : batchLoaders) {
-        loader.canonicalize();
-      }
-
       // Ensure all the incoming batches have the identical schema.
+      // Note: RecordBatchLoader permutes the columns to obtain the same columns order for all batches.
       if (!isSameSchemaAmongBatches(batchLoaders)) {
         context.fail(new SchemaChangeException("Incoming batches for merging receiver have different schemas!"));
         return IterOutcome.STOP;
@@ -581,7 +577,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
-    outgoingContainer = VectorContainer.canonicalize(outgoingContainer);
     outgoingContainer.buildSchema(SelectionVectorMode.NONE);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c1118a3d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
index 0cd55eb..6de4df6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -65,12 +65,6 @@ public class RecordBatchData {
     container.buildSchema(batch.getSchema().getSelectionVectorMode());
   }
 
-  public void canonicalize() {
-    SelectionVectorMode mode = container.getSchema().getSelectionVectorMode();
-    container = VectorContainer.canonicalize(container);
-    container.buildSchema(mode);
-  }
-
   public int getRecordCount() {
     return recordCount;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c1118a3d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 999fb04..6b3de25 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -132,12 +132,6 @@ public class SortRecordBatchBuilder implements AutoCloseable {
     recordCount += rbd.getRecordCount();
   }
 
-  public void canonicalize() {
-    for (RecordBatchData batch : batches.values()) {
-      batch.canonicalize();
-    }
-  }
-
   public boolean isEmpty() {
     return batches.isEmpty();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c1118a3d/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 20b5cb5..3e6bf64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -270,22 +270,4 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
     resetRecordCount();
   }
 
-  /**
-   * Sorts vectors into canonical order (by field name).  Updates schema and
-   * internal vector container.
-   */
-  public void canonicalize() {
-    //logger.debug( "RecordBatchLoader : before schema " + schema);
-    container = VectorContainer.canonicalize(container);
-
-    // rebuild the schema.
-    SchemaBuilder b = BatchSchema.newBuilder();
-    for(final VectorWrapper<?> v : container){
-      b.addField(v.getField());
-    }
-    b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
-    this.schema = b.build();
-
-    //logger.debug( "RecordBatchLoader : after schema " + schema);
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c1118a3d/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index abcb846..9564f11 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -201,31 +201,6 @@ public class VectorContainer implements VectorAccessible {
     return vc;
   }
 
-  /**
-   * Sorts vectors into canonical order (by field name) in new VectorContainer.
-   */
-  public static VectorContainer canonicalize(VectorContainer original) {
-    VectorContainer vc = new VectorContainer();
-    List<VectorWrapper<?>> canonicalWrappers = new ArrayList<>(original.wrappers);
-    // Sort list of VectorWrapper alphabetically based on SchemaPath.
-    Collections.sort(canonicalWrappers, new Comparator<VectorWrapper<?>>() {
-      @Override
-      public int compare(VectorWrapper<?> v1, VectorWrapper<?> v2) {
-        return v1.getField().getName().compareTo(v2.getField().getName());
-      }
-    });
-
-    for (VectorWrapper<?> w : canonicalWrappers) {
-      if (w.isHyper()) {
-        vc.add(w.getValueVectors());
-      } else {
-        vc.add(w.getValueVector());
-      }
-    }
-    vc.allocator = original.allocator;
-    return vc;
-  }
-
   private void cloneAndTransfer(VectorWrapper<?> wrapper) {
     wrappers.add(wrapper.cloneAndTransfer(getAllocator()));
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/c1118a3d/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
index bdb080c..b4ac11f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestStarQueries.java
@@ -531,4 +531,21 @@ public class TestStarQueries extends BaseTestQuery{
         .run();
   }
 
+  @Test // DRILL-5822
+  public void testSchemaForParallelizedStarOrderBy() throws Exception {
+    final String query = "select * from cp.`tpch/region.parquet` order by r_name";
+    final BatchSchema expectedSchema = new SchemaBuilder()
+        .add("r_regionkey", TypeProtos.MinorType.INT)
+        .add("r_name",TypeProtos.MinorType.VARCHAR)
+        .add("r_comment", TypeProtos.MinorType.VARCHAR)
+        .build();
+
+    testBuilder()
+        .sqlQuery(query)
+        .optionSettingQueriesForTestQuery("alter session set `planner.slice_target`=1")
+        .schemaBaseLine(expectedSchema)
+        .build()
+        .run();
+  }
+
 }