You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/05/19 20:37:45 UTC

[drill] branch master updated (f4f4dc5 -> 82e1a12)

This is an automated email from the ASF dual-hosted git repository.

arina pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from f4f4dc5  DRILL-4525: Code cleanup
     new dc1db98  DRILL-6348: Received batches are now owned by the receive operators instead of the parent
     new b7d259b  DRILL-6418: Handle Schema change in Unnest And Lateral for unnest field / non-unnest field
     new 0029097  DRILL-5305: Query Profile must display Query ID
     new 82e1a12  DRILL-6423: Export query result as a CSV file

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/drill/exec/ops/FragmentContextImpl.java |   5 +-
 .../exec/physical/impl/MergingReceiverCreator.java |   2 +-
 .../exec/physical/impl/join/LateralJoinBatch.java  |  45 +--
 .../impl/mergereceiver/MergingRecordBatch.java     |   3 +
 .../physical/impl/project/ProjectRecordBatch.java  |   2 +-
 .../physical/impl/unnest/UnnestRecordBatch.java    |   6 +-
 .../unorderedreceiver/UnorderedReceiverBatch.java  |   3 +
 .../UnorderedReceiverCreator.java                  |   2 +-
 .../org/apache/drill/exec/record/BatchSchema.java  |  27 +-
 .../apache/drill/exec/record/VectorContainer.java  |   9 +
 .../drill/exec/server/rest/QueryResources.java     |   6 +
 .../drill/exec/server/rest/QueryWrapper.java       |  12 +-
 .../exec/server/rest/profile/ProfileWrapper.java   |   2 +-
 .../exec/work/batch/AbstractDataCollector.java     |  17 +
 .../drill/exec/work/batch/DataCollector.java       |  14 +-
 .../drill/exec/work/batch/IncomingBuffers.java     |  10 +-
 .../apache/drill/exec/work/user/UserWorker.java    |   5 +-
 .../src/main/resources/rest/profile/profile.ftl    |   2 +-
 .../src/main/resources/rest/query/result.ftl       |  71 +++-
 .../impl/join/TestLateralJoinCorrectness.java      | 252 ++++++++++++-
 .../impl/limit/TestLimitBatchEmitOutcome.java      |   3 +
 .../impl/project/TestProjectEmitOutcome.java       |   3 +
 .../unnest/TestUnnestWithLateralCorrectness.java   |   8 +-
 .../drill/exec/record/MaterializedField.java       |  43 ++-
 .../exec/vector/complex/AbstractMapVector.java     |   3 +
 .../drill/exec/proto/SchemaUserBitShared.java      |   7 +
 .../org/apache/drill/exec/proto/UserBitShared.java | 412 ++++++++++++++-------
 .../org/apache/drill/exec/proto/beans/QueryId.java |  22 ++
 protocol/src/main/protobuf/UserBitShared.proto     |   1 +
 29 files changed, 801 insertions(+), 196 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
arina@apache.org.

[drill] 03/04: DRILL-5305: Query Profile must display Query ID

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 00290977bc0e39bd188ceb9fcf90f3813f8c6e5a
Author: Kunal Khatua <ku...@apache.org>
AuthorDate: Wed May 16 10:22:41 2018 -0700

    DRILL-5305: Query Profile must display Query ID
    
    Introduced change to the Protobuf to inject the text-equivalent of the QueryID into the profile. This way, the profile's file name can be changed, but restored back based on this new field.
    The Profile UI also shows the Query ID, though this is not inferred from this new field, for sake of backward compatibility with older profiles.
    
    closes #1265
---
 .../exec/server/rest/profile/ProfileWrapper.java   |   2 +-
 .../apache/drill/exec/work/user/UserWorker.java    |   5 +-
 .../src/main/resources/rest/profile/profile.ftl    |   2 +-
 .../drill/exec/proto/SchemaUserBitShared.java      |   7 +
 .../org/apache/drill/exec/proto/UserBitShared.java | 412 ++++++++++++++-------
 .../org/apache/drill/exec/proto/beans/QueryId.java |  22 ++
 protocol/src/main/protobuf/UserBitShared.proto     |   1 +
 7 files changed, 320 insertions(+), 131 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
index a618f7e..fa45c1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
@@ -63,7 +63,7 @@ public class ProfileWrapper {
 
   public ProfileWrapper(final QueryProfile profile, DrillConfig drillConfig) {
     this.profile = profile;
-    this.id = QueryIdHelper.getQueryId(profile.getId());
+    this.id = profile.getId().hasText() ? profile.getId().getText() : QueryIdHelper.getQueryId(profile.getId());
     //Generating Operator Name map (DRILL-6140)
     String profileTextPlan = profile.hasPlan() ? profile.getPlan() : "" ;
     generateOpMap(profileTextPlan);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
index 9c32b56..880bb98 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.work.user;
 
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -68,7 +69,9 @@ public class UserWorker{
     final long time = (int) (System.currentTimeMillis()/1000);
     final long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
     final long p2 = r.nextLong();
-    final QueryId id = QueryId.newBuilder().setPart1(p1).setPart2(p2).build();
+    final QueryId id = QueryId.newBuilder().setPart1(p1).setPart2(p2)
+        .setText((new UUID(p1, p2)).toString())
+        .build();
     return id;
   }
 
diff --git a/exec/java-exec/src/main/resources/rest/profile/profile.ftl b/exec/java-exec/src/main/resources/rest/profile/profile.ftl
index 9072f4d..64a955c 100644
--- a/exec/java-exec/src/main/resources/rest/profile/profile.ftl
+++ b/exec/java-exec/src/main/resources/rest/profile/profile.ftl
@@ -165,7 +165,7 @@ table.sortable thead .sorting_desc { background-image: url("/static/img/black-de
   <#assign queued = queueName != "" && queueName != "-" />
 
   <div class="page-header"></div>
-  <h3>Query Profile</h3>
+  <h3>Query Profile : <span style='font-size:85%'>${model.getQueryId()}</span></h3>
   <div class="panel-group" id="query-profile-accordion">
     <div class="panel panel-default">
       <div class="panel-heading">
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
index 44e4d32..35bc086 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
@@ -150,6 +150,8 @@ public final class SchemaUserBitShared
                     output.writeSFixed64(1, message.getPart1(), false);
                 if(message.hasPart2())
                     output.writeSFixed64(2, message.getPart2(), false);
+                if(message.hasText())
+                    output.writeString(3, message.getText(), false);
             }
             public boolean isInitialized(org.apache.drill.exec.proto.UserBitShared.QueryId message)
             {
@@ -195,6 +197,9 @@ public final class SchemaUserBitShared
                         case 2:
                             builder.setPart2(input.readSFixed64());
                             break;
+                        case 3:
+                            builder.setText(input.readString());
+                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -237,6 +242,7 @@ public final class SchemaUserBitShared
             {
                 case 1: return "part1";
                 case 2: return "part2";
+                case 3: return "text";
                 default: return null;
             }
         }
@@ -250,6 +256,7 @@ public final class SchemaUserBitShared
         {
             fieldMap.put("part1", 1);
             fieldMap.put("part2", 2);
+            fieldMap.put("text", 3);
         }
     }
 
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 72b5eab..d8eef0f 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -1422,6 +1422,21 @@ public final class UserBitShared {
      * <code>optional sfixed64 part2 = 2;</code>
      */
     long getPart2();
+
+    // optional string text = 3;
+    /**
+     * <code>optional string text = 3;</code>
+     */
+    boolean hasText();
+    /**
+     * <code>optional string text = 3;</code>
+     */
+    java.lang.String getText();
+    /**
+     * <code>optional string text = 3;</code>
+     */
+    com.google.protobuf.ByteString
+        getTextBytes();
   }
   /**
    * Protobuf type {@code exec.shared.QueryId}
@@ -1484,6 +1499,11 @@ public final class UserBitShared {
               part2_ = input.readSFixed64();
               break;
             }
+            case 26: {
+              bitField0_ |= 0x00000004;
+              text_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1556,9 +1576,53 @@ public final class UserBitShared {
       return part2_;
     }
 
+    // optional string text = 3;
+    public static final int TEXT_FIELD_NUMBER = 3;
+    private java.lang.Object text_;
+    /**
+     * <code>optional string text = 3;</code>
+     */
+    public boolean hasText() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional string text = 3;</code>
+     */
+    public java.lang.String getText() {
+      java.lang.Object ref = text_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          text_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string text = 3;</code>
+     */
+    public com.google.protobuf.ByteString
+        getTextBytes() {
+      java.lang.Object ref = text_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        text_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
       part1_ = 0L;
       part2_ = 0L;
+      text_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1578,6 +1642,9 @@ public final class UserBitShared {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeSFixed64(2, part2_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(3, getTextBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1595,6 +1662,10 @@ public final class UserBitShared {
         size += com.google.protobuf.CodedOutputStream
           .computeSFixed64Size(2, part2_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(3, getTextBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1715,6 +1786,8 @@ public final class UserBitShared {
         bitField0_ = (bitField0_ & ~0x00000001);
         part2_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000002);
+        text_ = "";
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
 
@@ -1751,6 +1824,10 @@ public final class UserBitShared {
           to_bitField0_ |= 0x00000002;
         }
         result.part2_ = part2_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.text_ = text_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1773,6 +1850,11 @@ public final class UserBitShared {
         if (other.hasPart2()) {
           setPart2(other.getPart2());
         }
+        if (other.hasText()) {
+          bitField0_ |= 0x00000004;
+          text_ = other.text_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1866,6 +1948,80 @@ public final class UserBitShared {
         return this;
       }
 
+      // optional string text = 3;
+      private java.lang.Object text_ = "";
+      /**
+       * <code>optional string text = 3;</code>
+       */
+      public boolean hasText() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional string text = 3;</code>
+       */
+      public java.lang.String getText() {
+        java.lang.Object ref = text_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          text_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string text = 3;</code>
+       */
+      public com.google.protobuf.ByteString
+          getTextBytes() {
+        java.lang.Object ref = text_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          text_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string text = 3;</code>
+       */
+      public Builder setText(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        text_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string text = 3;</code>
+       */
+      public Builder clearText() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        text_ = getDefaultInstance().getText();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string text = 3;</code>
+       */
+      public Builder setTextBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        text_ = value;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.shared.QueryId)
     }
 
@@ -24049,133 +24205,133 @@ public final class UserBitShared {
       "\n\023UserBitShared.proto\022\013exec.shared\032\013Type" +
       "s.proto\032\022Coordination.proto\032\017SchemaDef.p" +
       "roto\"$\n\017UserCredentials\022\021\n\tuser_name\030\001 \001" +
-      "(\t\"\'\n\007QueryId\022\r\n\005part1\030\001 \001(\020\022\r\n\005part2\030\002 " +
-      "\001(\020\"\355\003\n\014DrillPBError\022\020\n\010error_id\030\001 \001(\t\022(" +
-      "\n\010endpoint\030\002 \001(\0132\026.exec.DrillbitEndpoint" +
-      "\0227\n\nerror_type\030\003 \001(\0162#.exec.shared.Drill" +
-      "PBError.ErrorType\022\017\n\007message\030\004 \001(\t\0220\n\tex" +
-      "ception\030\005 \001(\0132\035.exec.shared.ExceptionWra" +
-      "pper\0220\n\rparsing_error\030\006 \003(\0132\031.exec.share",
-      "d.ParsingError\"\362\001\n\tErrorType\022\016\n\nCONNECTI" +
-      "ON\020\000\022\r\n\tDATA_READ\020\001\022\016\n\nDATA_WRITE\020\002\022\014\n\010F" +
-      "UNCTION\020\003\022\t\n\005PARSE\020\004\022\016\n\nPERMISSION\020\005\022\010\n\004" +
-      "PLAN\020\006\022\014\n\010RESOURCE\020\007\022\n\n\006SYSTEM\020\010\022\031\n\025UNSU" +
-      "PPORTED_OPERATION\020\t\022\016\n\nVALIDATION\020\n\022\023\n\017E" +
-      "XECUTION_ERROR\020\013\022\022\n\016INTERNAL_ERROR\020\014\022\025\n\021" +
-      "UNSPECIFIED_ERROR\020\r\"\246\001\n\020ExceptionWrapper" +
-      "\022\027\n\017exception_class\030\001 \001(\t\022\017\n\007message\030\002 \001" +
-      "(\t\022:\n\013stack_trace\030\003 \003(\0132%.exec.shared.St" +
-      "ackTraceElementWrapper\022,\n\005cause\030\004 \001(\0132\035.",
-      "exec.shared.ExceptionWrapper\"\205\001\n\030StackTr" +
-      "aceElementWrapper\022\022\n\nclass_name\030\001 \001(\t\022\021\n" +
-      "\tfile_name\030\002 \001(\t\022\023\n\013line_number\030\003 \001(\005\022\023\n" +
-      "\013method_name\030\004 \001(\t\022\030\n\020is_native_method\030\005" +
-      " \001(\010\"\\\n\014ParsingError\022\024\n\014start_column\030\002 \001" +
-      "(\005\022\021\n\tstart_row\030\003 \001(\005\022\022\n\nend_column\030\004 \001(" +
-      "\005\022\017\n\007end_row\030\005 \001(\005\"~\n\016RecordBatchDef\022\024\n\014" +
-      "record_count\030\001 \001(\005\022+\n\005field\030\002 \003(\0132\034.exec" +
-      ".shared.SerializedField\022)\n!carries_two_b" +
-      "yte_selection_vector\030\003 \001(\010\"\205\001\n\010NamePart\022",
-      "(\n\004type\030\001 \001(\0162\032.exec.shared.NamePart.Typ" +
-      "e\022\014\n\004name\030\002 \001(\t\022$\n\005child\030\003 \001(\0132\025.exec.sh" +
-      "ared.NamePart\"\033\n\004Type\022\010\n\004NAME\020\000\022\t\n\005ARRAY" +
-      "\020\001\"\324\001\n\017SerializedField\022%\n\nmajor_type\030\001 \001" +
-      "(\0132\021.common.MajorType\022(\n\tname_part\030\002 \001(\013" +
-      "2\025.exec.shared.NamePart\022+\n\005child\030\003 \003(\0132\034" +
-      ".exec.shared.SerializedField\022\023\n\013value_co" +
-      "unt\030\004 \001(\005\022\027\n\017var_byte_length\030\005 \001(\005\022\025\n\rbu" +
-      "ffer_length\030\007 \001(\005\"7\n\nNodeStatus\022\017\n\007node_" +
-      "id\030\001 \001(\005\022\030\n\020memory_footprint\030\002 \001(\003\"\263\002\n\013Q",
-      "ueryResult\0228\n\013query_state\030\001 \001(\0162#.exec.s" +
-      "hared.QueryResult.QueryState\022&\n\010query_id" +
-      "\030\002 \001(\0132\024.exec.shared.QueryId\022(\n\005error\030\003 " +
-      "\003(\0132\031.exec.shared.DrillPBError\"\227\001\n\nQuery" +
-      "State\022\014\n\010STARTING\020\000\022\013\n\007RUNNING\020\001\022\r\n\tCOMP" +
-      "LETED\020\002\022\014\n\010CANCELED\020\003\022\n\n\006FAILED\020\004\022\032\n\026CAN" +
-      "CELLATION_REQUESTED\020\005\022\014\n\010ENQUEUED\020\006\022\r\n\tP" +
-      "REPARING\020\007\022\014\n\010PLANNING\020\010\"p\n\tQueryData\022&\n" +
-      "\010query_id\030\001 \001(\0132\024.exec.shared.QueryId\022\021\n" +
-      "\trow_count\030\002 \001(\005\022(\n\003def\030\003 \001(\0132\033.exec.sha",
-      "red.RecordBatchDef\"\330\001\n\tQueryInfo\022\r\n\005quer" +
-      "y\030\001 \001(\t\022\r\n\005start\030\002 \001(\003\0222\n\005state\030\003 \001(\0162#." +
-      "exec.shared.QueryResult.QueryState\022\017\n\004us" +
-      "er\030\004 \001(\t:\001-\022\'\n\007foreman\030\005 \001(\0132\026.exec.Dril" +
-      "lbitEndpoint\022\024\n\014options_json\030\006 \001(\t\022\022\n\nto" +
-      "tal_cost\030\007 \001(\001\022\025\n\nqueue_name\030\010 \001(\t:\001-\"\242\004" +
-      "\n\014QueryProfile\022 \n\002id\030\001 \001(\0132\024.exec.shared" +
-      ".QueryId\022$\n\004type\030\002 \001(\0162\026.exec.shared.Que" +
-      "ryType\022\r\n\005start\030\003 \001(\003\022\013\n\003end\030\004 \001(\003\022\r\n\005qu" +
-      "ery\030\005 \001(\t\022\014\n\004plan\030\006 \001(\t\022\'\n\007foreman\030\007 \001(\013",
-      "2\026.exec.DrillbitEndpoint\0222\n\005state\030\010 \001(\0162" +
-      "#.exec.shared.QueryResult.QueryState\022\027\n\017" +
-      "total_fragments\030\t \001(\005\022\032\n\022finished_fragme" +
-      "nts\030\n \001(\005\022;\n\020fragment_profile\030\013 \003(\0132!.ex" +
-      "ec.shared.MajorFragmentProfile\022\017\n\004user\030\014" +
-      " \001(\t:\001-\022\r\n\005error\030\r \001(\t\022\024\n\014verboseError\030\016" +
-      " \001(\t\022\020\n\010error_id\030\017 \001(\t\022\022\n\nerror_node\030\020 \001" +
-      "(\t\022\024\n\014options_json\030\021 \001(\t\022\017\n\007planEnd\030\022 \001(" +
-      "\003\022\024\n\014queueWaitEnd\030\023 \001(\003\022\022\n\ntotal_cost\030\024 " +
-      "\001(\001\022\025\n\nqueue_name\030\025 \001(\t:\001-\"t\n\024MajorFragm",
-      "entProfile\022\031\n\021major_fragment_id\030\001 \001(\005\022A\n" +
-      "\026minor_fragment_profile\030\002 \003(\0132!.exec.sha" +
-      "red.MinorFragmentProfile\"\350\002\n\024MinorFragme" +
-      "ntProfile\022)\n\005state\030\001 \001(\0162\032.exec.shared.F" +
-      "ragmentState\022(\n\005error\030\002 \001(\0132\031.exec.share" +
-      "d.DrillPBError\022\031\n\021minor_fragment_id\030\003 \001(" +
-      "\005\0226\n\020operator_profile\030\004 \003(\0132\034.exec.share" +
-      "d.OperatorProfile\022\022\n\nstart_time\030\005 \001(\003\022\020\n" +
-      "\010end_time\030\006 \001(\003\022\023\n\013memory_used\030\007 \001(\003\022\027\n\017" +
-      "max_memory_used\030\010 \001(\003\022(\n\010endpoint\030\t \001(\0132",
-      "\026.exec.DrillbitEndpoint\022\023\n\013last_update\030\n" +
-      " \001(\003\022\025\n\rlast_progress\030\013 \001(\003\"\377\001\n\017Operator" +
-      "Profile\0221\n\rinput_profile\030\001 \003(\0132\032.exec.sh" +
-      "ared.StreamProfile\022\023\n\013operator_id\030\003 \001(\005\022" +
-      "\025\n\roperator_type\030\004 \001(\005\022\023\n\013setup_nanos\030\005 " +
-      "\001(\003\022\025\n\rprocess_nanos\030\006 \001(\003\022#\n\033peak_local" +
-      "_memory_allocated\030\007 \001(\003\022(\n\006metric\030\010 \003(\0132" +
-      "\030.exec.shared.MetricValue\022\022\n\nwait_nanos\030" +
-      "\t \001(\003\"B\n\rStreamProfile\022\017\n\007records\030\001 \001(\003\022" +
-      "\017\n\007batches\030\002 \001(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013Met",
-      "ricValue\022\021\n\tmetric_id\030\001 \001(\005\022\022\n\nlong_valu" +
-      "e\030\002 \001(\003\022\024\n\014double_value\030\003 \001(\001\")\n\010Registr" +
-      "y\022\035\n\003jar\030\001 \003(\0132\020.exec.shared.Jar\"/\n\003Jar\022" +
-      "\014\n\004name\030\001 \001(\t\022\032\n\022function_signature\030\002 \003(" +
-      "\t\"W\n\013SaslMessage\022\021\n\tmechanism\030\001 \001(\t\022\014\n\004d" +
-      "ata\030\002 \001(\014\022\'\n\006status\030\003 \001(\0162\027.exec.shared." +
-      "SaslStatus*5\n\nRpcChannel\022\017\n\013BIT_CONTROL\020" +
-      "\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*V\n\tQueryType\022\007" +
-      "\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PHYSICAL\020\003\022\r\n\tEX" +
-      "ECUTION\020\004\022\026\n\022PREPARED_STATEMENT\020\005*\207\001\n\rFr",
-      "agmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALL" +
-      "OCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\t" +
-      "CANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_" +
-      "REQUESTED\020\006*\360\006\n\020CoreOperatorType\022\021\n\rSING" +
-      "LE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006FIL" +
-      "TER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004" +
-      "\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SENDE" +
-      "R\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_RECEIVER\020\010\022\034\n\030" +
-      "ORDERED_PARTITION_SENDER\020\t\022\013\n\007PROJECT\020\n\022" +
-      "\026\n\022UNORDERED_RECEIVER\020\013\022\020\n\014RANGE_SENDER\020",
-      "\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTION_VECTOR_REMOVE" +
-      "R\020\016\022\027\n\023STREAMING_AGGREGATE\020\017\022\016\n\nTOP_N_SO" +
-      "RT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UN" +
-      "ION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026PARQUET_ROW_GROUP" +
-      "_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN\020\026\022\025\n\021SYSTEM_TAB" +
-      "LE_SCAN\020\027\022\021\n\rMOCK_SUB_SCAN\020\030\022\022\n\016PARQUET_" +
-      "WRITER\020\031\022\023\n\017DIRECT_SUB_SCAN\020\032\022\017\n\013TEXT_WR" +
-      "ITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022\021\n\rJSON_SUB_SC" +
-      "AN\020\035\022\030\n\024INFO_SCHEMA_SUB_SCAN\020\036\022\023\n\017COMPLE" +
-      "X_TO_JSON\020\037\022\025\n\021PRODUCER_CONSUMER\020 \022\022\n\016HB",
-      "ASE_SUB_SCAN\020!\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOO" +
-      "P_JOIN\020#\022\021\n\rAVRO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_" +
-      "SCAN\020%\022\022\n\016KAFKA_SUB_SCAN\020&\022\021\n\rKUDU_SUB_S" +
-      "CAN\020\'\022\013\n\007FLATTEN\020(\022\020\n\014LATERAL_JOIN\020)\022\n\n\006" +
-      "UNNEST\020*\022,\n(HIVE_DRILL_NATIVE_PARQUET_RO" +
-      "W_GROUP_SCAN\020+*g\n\nSaslStatus\022\020\n\014SASL_UNK" +
-      "NOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRE" +
-      "SS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B" +
-      ".\n\033org.apache.drill.exec.protoB\rUserBitS" +
-      "haredH\001"
+      "(\t\"5\n\007QueryId\022\r\n\005part1\030\001 \001(\020\022\r\n\005part2\030\002 " +
+      "\001(\020\022\014\n\004text\030\003 \001(\t\"\355\003\n\014DrillPBError\022\020\n\010er" +
+      "ror_id\030\001 \001(\t\022(\n\010endpoint\030\002 \001(\0132\026.exec.Dr" +
+      "illbitEndpoint\0227\n\nerror_type\030\003 \001(\0162#.exe" +
+      "c.shared.DrillPBError.ErrorType\022\017\n\007messa" +
+      "ge\030\004 \001(\t\0220\n\texception\030\005 \001(\0132\035.exec.share" +
+      "d.ExceptionWrapper\0220\n\rparsing_error\030\006 \003(",
+      "\0132\031.exec.shared.ParsingError\"\362\001\n\tErrorTy" +
+      "pe\022\016\n\nCONNECTION\020\000\022\r\n\tDATA_READ\020\001\022\016\n\nDAT" +
+      "A_WRITE\020\002\022\014\n\010FUNCTION\020\003\022\t\n\005PARSE\020\004\022\016\n\nPE" +
+      "RMISSION\020\005\022\010\n\004PLAN\020\006\022\014\n\010RESOURCE\020\007\022\n\n\006SY" +
+      "STEM\020\010\022\031\n\025UNSUPPORTED_OPERATION\020\t\022\016\n\nVAL" +
+      "IDATION\020\n\022\023\n\017EXECUTION_ERROR\020\013\022\022\n\016INTERN" +
+      "AL_ERROR\020\014\022\025\n\021UNSPECIFIED_ERROR\020\r\"\246\001\n\020Ex" +
+      "ceptionWrapper\022\027\n\017exception_class\030\001 \001(\t\022" +
+      "\017\n\007message\030\002 \001(\t\022:\n\013stack_trace\030\003 \003(\0132%." +
+      "exec.shared.StackTraceElementWrapper\022,\n\005",
+      "cause\030\004 \001(\0132\035.exec.shared.ExceptionWrapp" +
+      "er\"\205\001\n\030StackTraceElementWrapper\022\022\n\nclass" +
+      "_name\030\001 \001(\t\022\021\n\tfile_name\030\002 \001(\t\022\023\n\013line_n" +
+      "umber\030\003 \001(\005\022\023\n\013method_name\030\004 \001(\t\022\030\n\020is_n" +
+      "ative_method\030\005 \001(\010\"\\\n\014ParsingError\022\024\n\014st" +
+      "art_column\030\002 \001(\005\022\021\n\tstart_row\030\003 \001(\005\022\022\n\ne" +
+      "nd_column\030\004 \001(\005\022\017\n\007end_row\030\005 \001(\005\"~\n\016Reco" +
+      "rdBatchDef\022\024\n\014record_count\030\001 \001(\005\022+\n\005fiel" +
+      "d\030\002 \003(\0132\034.exec.shared.SerializedField\022)\n" +
+      "!carries_two_byte_selection_vector\030\003 \001(\010",
+      "\"\205\001\n\010NamePart\022(\n\004type\030\001 \001(\0162\032.exec.share" +
+      "d.NamePart.Type\022\014\n\004name\030\002 \001(\t\022$\n\005child\030\003" +
+      " \001(\0132\025.exec.shared.NamePart\"\033\n\004Type\022\010\n\004N" +
+      "AME\020\000\022\t\n\005ARRAY\020\001\"\324\001\n\017SerializedField\022%\n\n" +
+      "major_type\030\001 \001(\0132\021.common.MajorType\022(\n\tn" +
+      "ame_part\030\002 \001(\0132\025.exec.shared.NamePart\022+\n" +
+      "\005child\030\003 \003(\0132\034.exec.shared.SerializedFie" +
+      "ld\022\023\n\013value_count\030\004 \001(\005\022\027\n\017var_byte_leng" +
+      "th\030\005 \001(\005\022\025\n\rbuffer_length\030\007 \001(\005\"7\n\nNodeS" +
+      "tatus\022\017\n\007node_id\030\001 \001(\005\022\030\n\020memory_footpri",
+      "nt\030\002 \001(\003\"\263\002\n\013QueryResult\0228\n\013query_state\030" +
+      "\001 \001(\0162#.exec.shared.QueryResult.QuerySta" +
+      "te\022&\n\010query_id\030\002 \001(\0132\024.exec.shared.Query" +
+      "Id\022(\n\005error\030\003 \003(\0132\031.exec.shared.DrillPBE" +
+      "rror\"\227\001\n\nQueryState\022\014\n\010STARTING\020\000\022\013\n\007RUN" +
+      "NING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n\006F" +
+      "AILED\020\004\022\032\n\026CANCELLATION_REQUESTED\020\005\022\014\n\010E" +
+      "NQUEUED\020\006\022\r\n\tPREPARING\020\007\022\014\n\010PLANNING\020\010\"p" +
+      "\n\tQueryData\022&\n\010query_id\030\001 \001(\0132\024.exec.sha" +
+      "red.QueryId\022\021\n\trow_count\030\002 \001(\005\022(\n\003def\030\003 ",
+      "\001(\0132\033.exec.shared.RecordBatchDef\"\330\001\n\tQue" +
+      "ryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005start\030\002 \001(\003\0222\n\005" +
+      "state\030\003 \001(\0162#.exec.shared.QueryResult.Qu" +
+      "eryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n\007foreman\030\005 \001" +
+      "(\0132\026.exec.DrillbitEndpoint\022\024\n\014options_js" +
+      "on\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001\022\025\n\nqueue_na" +
+      "me\030\010 \001(\t:\001-\"\242\004\n\014QueryProfile\022 \n\002id\030\001 \001(\013" +
+      "2\024.exec.shared.QueryId\022$\n\004type\030\002 \001(\0162\026.e" +
+      "xec.shared.QueryType\022\r\n\005start\030\003 \001(\003\022\013\n\003e" +
+      "nd\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004plan\030\006 \001(\t\022\'\n",
+      "\007foreman\030\007 \001(\0132\026.exec.DrillbitEndpoint\0222" +
+      "\n\005state\030\010 \001(\0162#.exec.shared.QueryResult." +
+      "QueryState\022\027\n\017total_fragments\030\t \001(\005\022\032\n\022f" +
+      "inished_fragments\030\n \001(\005\022;\n\020fragment_prof" +
+      "ile\030\013 \003(\0132!.exec.shared.MajorFragmentPro" +
+      "file\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005error\030\r \001(\t\022\024\n\014" +
+      "verboseError\030\016 \001(\t\022\020\n\010error_id\030\017 \001(\t\022\022\n\n" +
+      "error_node\030\020 \001(\t\022\024\n\014options_json\030\021 \001(\t\022\017" +
+      "\n\007planEnd\030\022 \001(\003\022\024\n\014queueWaitEnd\030\023 \001(\003\022\022\n" +
+      "\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_name\030\025 \001(\t:\001-",
+      "\"t\n\024MajorFragmentProfile\022\031\n\021major_fragme" +
+      "nt_id\030\001 \001(\005\022A\n\026minor_fragment_profile\030\002 " +
+      "\003(\0132!.exec.shared.MinorFragmentProfile\"\350" +
+      "\002\n\024MinorFragmentProfile\022)\n\005state\030\001 \001(\0162\032" +
+      ".exec.shared.FragmentState\022(\n\005error\030\002 \001(" +
+      "\0132\031.exec.shared.DrillPBError\022\031\n\021minor_fr" +
+      "agment_id\030\003 \001(\005\0226\n\020operator_profile\030\004 \003(" +
+      "\0132\034.exec.shared.OperatorProfile\022\022\n\nstart" +
+      "_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(\003\022\023\n\013memory_" +
+      "used\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n\010e",
+      "ndpoint\030\t \001(\0132\026.exec.DrillbitEndpoint\022\023\n" +
+      "\013last_update\030\n \001(\003\022\025\n\rlast_progress\030\013 \001(" +
+      "\003\"\377\001\n\017OperatorProfile\0221\n\rinput_profile\030\001" +
+      " \003(\0132\032.exec.shared.StreamProfile\022\023\n\013oper" +
+      "ator_id\030\003 \001(\005\022\025\n\roperator_type\030\004 \001(\005\022\023\n\013" +
+      "setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001(\003" +
+      "\022#\n\033peak_local_memory_allocated\030\007 \001(\003\022(\n" +
+      "\006metric\030\010 \003(\0132\030.exec.shared.MetricValue\022" +
+      "\022\n\nwait_nanos\030\t \001(\003\"B\n\rStreamProfile\022\017\n\007" +
+      "records\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007schema",
+      "s\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 \001(" +
+      "\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014double_value\030\003 " +
+      "\001(\001\")\n\010Registry\022\035\n\003jar\030\001 \003(\0132\020.exec.shar" +
+      "ed.Jar\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022function_" +
+      "signature\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmechan" +
+      "ism\030\001 \001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(\0162" +
+      "\027.exec.shared.SaslStatus*5\n\nRpcChannel\022\017" +
+      "\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020\002*" +
+      "V\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010PH" +
+      "YSICAL\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_STAT",
+      "EMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000\022\027" +
+      "\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010" +
+      "FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n" +
+      "\026CANCELLATION_REQUESTED\020\006*\360\006\n\020CoreOperat" +
+      "orType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST_S" +
+      "ENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022" +
+      "\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_P" +
+      "ARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGING_" +
+      "RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_SENDER\020\t" +
+      "\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022\020\n",
+      "\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTION" +
+      "_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGREGATE" +
+      "\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022\t\n" +
+      "\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026PAR" +
+      "QUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN\020\026" +
+      "\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SCAN" +
+      "\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB_SCA" +
+      "N\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020\034\022" +
+      "\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_SC" +
+      "AN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_CO",
+      "NSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WINDOW\020\"" +
+      "\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_SCAN\020" +
+      "$\022\021\n\rPCAP_SUB_SCAN\020%\022\022\n\016KAFKA_SUB_SCAN\020&" +
+      "\022\021\n\rKUDU_SUB_SCAN\020\'\022\013\n\007FLATTEN\020(\022\020\n\014LATE" +
+      "RAL_JOIN\020)\022\n\n\006UNNEST\020*\022,\n(HIVE_DRILL_NAT" +
+      "IVE_PARQUET_ROW_GROUP_SCAN\020+*g\n\nSaslStat" +
+      "us\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020" +
+      "SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013" +
+      "SASL_FAILED\020\004B.\n\033org.apache.drill.exec.p" +
+      "rotoB\rUserBitSharedH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -24193,7 +24349,7 @@ public final class UserBitShared {
           internal_static_exec_shared_QueryId_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_shared_QueryId_descriptor,
-              new java.lang.String[] { "Part1", "Part2", });
+              new java.lang.String[] { "Part1", "Part2", "Text", });
           internal_static_exec_shared_DrillPBError_descriptor =
             getDescriptor().getMessageTypes().get(2);
           internal_static_exec_shared_DrillPBError_fieldAccessorTable = new
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryId.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryId.java
index 338234f..72406fe 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryId.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryId.java
@@ -49,6 +49,7 @@ public final class QueryId implements Externalizable, Message<QueryId>, Schema<Q
     
     private long part1;
     private long part2;
+    private String text;
 
     public QueryId()
     {
@@ -83,6 +84,19 @@ public final class QueryId implements Externalizable, Message<QueryId>, Schema<Q
         return this;
     }
 
+    // text
+
+    public String getText()
+    {
+        return text;
+    }
+
+    public QueryId setText(String text)
+    {
+        this.text = text;
+        return this;
+    }
+
     // java serialization
 
     public void readExternal(ObjectInput in) throws IOException
@@ -143,6 +157,9 @@ public final class QueryId implements Externalizable, Message<QueryId>, Schema<Q
                 case 2:
                     message.part2 = input.readSFixed64();
                     break;
+                case 3:
+                    message.text = input.readString();
+                    break;
                 default:
                     input.handleUnknownField(number, this);
             }   
@@ -157,6 +174,9 @@ public final class QueryId implements Externalizable, Message<QueryId>, Schema<Q
 
         if(message.part2 != 0)
             output.writeSFixed64(2, message.part2, false);
+
+        if(message.text != null)
+            output.writeString(3, message.text, false);
     }
 
     public String getFieldName(int number)
@@ -165,6 +185,7 @@ public final class QueryId implements Externalizable, Message<QueryId>, Schema<Q
         {
             case 1: return "part1";
             case 2: return "part2";
+            case 3: return "text";
             default: return null;
         }
     }
@@ -180,6 +201,7 @@ public final class QueryId implements Externalizable, Message<QueryId>, Schema<Q
     {
         __fieldMap.put("part1", 1);
         __fieldMap.put("part2", 2);
+        __fieldMap.put("text", 3);
     }
     
 }
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 5b89a81..14bfb87 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -31,6 +31,7 @@ message UserCredentials {
 message QueryId {
   optional sfixed64 part1 = 1;
   optional sfixed64 part2 = 2;
+  optional string text = 3;
 }
 
 message DrillPBError{

-- 
To stop receiving notification emails like this one, please contact
arina@apache.org.

[drill] 01/04: DRILL-6348: Received batches are now owned by the receive operators instead of the parent

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit dc1db98f613f041e40c2c704862051ee2a51734a
Author: Salim Achouche <sa...@gmail.com>
AuthorDate: Sun Apr 22 18:02:35 2018 -0700

    DRILL-6348: Received batches are now owned by the receive operators instead of the parent
    
    closes #1237
---
 .../org/apache/drill/exec/ops/FragmentContextImpl.java  |  5 ++++-
 .../exec/physical/impl/MergingReceiverCreator.java      |  2 +-
 .../physical/impl/mergereceiver/MergingRecordBatch.java |  3 +++
 .../impl/unorderedreceiver/UnorderedReceiverBatch.java  |  3 +++
 .../unorderedreceiver/UnorderedReceiverCreator.java     |  2 +-
 .../drill/exec/work/batch/AbstractDataCollector.java    | 17 +++++++++++++++++
 .../org/apache/drill/exec/work/batch/DataCollector.java | 14 +++++++++++++-
 .../apache/drill/exec/work/batch/IncomingBuffers.java   | 10 +++++++---
 8 files changed, 49 insertions(+), 7 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index b192850..503ebdd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -449,13 +449,16 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
   public void close() {
     waitForSendComplete();
 
+    // Close the buffers before closing the operators; this is needed as buffer ownership
+    // is attached to the receive operators.
+    suppressingClose(buffers);
+
     // close operator context
     for (OperatorContextImpl opContext : contexts) {
       suppressingClose(opContext);
     }
 
     suppressingClose(bufferManager);
-    suppressingClose(buffers);
     suppressingClose(allocator);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
index 0ef84b9..66a0cc2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
@@ -42,7 +42,7 @@ public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP>
     IncomingBuffers bufHolder = context.getBuffers();
 
     assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared.";
-    RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
+    RawBatchBuffer[] buffers = bufHolder.getCollector(receiver.getOppositeMajorFragmentId()).getBuffers();
 
     return new MergingRecordBatch(context, receiver, buffers);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 7e5ff21..9087757 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -136,6 +136,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
     this.config = config;
     this.inputCounts = new long[config.getNumSenders()];
     this.outputCounts = new long[config.getNumSenders()];
+
+    // Register this operator's buffer allocator so that incoming buffers are owned by this allocator
+    context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator());
   }
 
   @SuppressWarnings("resource")
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 433e0c8..424a733 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -86,6 +86,9 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
     this.stats = oContext.getStats();
     this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
     this.config = config;
+
+    // Register this operator's buffer allocator so that incoming buffers are owned by this allocator
+    context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator());
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
index 01a4588..3dcdfc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
@@ -37,7 +37,7 @@ public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver>
     IncomingBuffers bufHolder = context.getBuffers();
     assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared.";
 
-    RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
+    RawBatchBuffer[] buffers = bufHolder.getCollector(receiver.getOppositeMajorFragmentId()).getBuffers();
     assert buffers.length == 1;
     RawBatchBuffer buffer = buffers[0];
     return new UnorderedReceiverBatch(context, buffer, receiver);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index b6b4183..bb3a5a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
 import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.Collector;
 import org.apache.drill.exec.record.RawFragmentBatch;
@@ -37,6 +38,8 @@ public abstract class AbstractDataCollector implements DataCollector {
   private final int incomingStreams;
   protected final RawBatchBuffer[] buffers;
   protected final ArrayWrappedIntIntMap fragmentMap;
+  /** Allocator which owns incoming batches */
+  protected BufferAllocator ownerAllocator;
 
   /**
    * @param parentAccounter
@@ -53,6 +56,7 @@ public abstract class AbstractDataCollector implements DataCollector {
     this.parentAccounter = parentAccounter;
     this.remainders = new AtomicIntegerArray(incomingStreams);
     this.oppositeMajorFragmentId = collector.getOppositeMajorFragmentId();
+    this.ownerAllocator = context.getAllocator();
     // Create fragmentId to index that is within the range [0, incoming.size()-1]
     // We use this mapping to find objects belonging to the fragment in buffers and remainders arrays.
     fragmentMap = new ArrayWrappedIntIntMap();
@@ -116,4 +120,17 @@ public abstract class AbstractDataCollector implements DataCollector {
     AutoCloseables.close(buffers);
   }
 
+  /** {@inheritDoc} */
+  @Override
+  public BufferAllocator getAllocator() {
+    return this.ownerAllocator;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void setAllocator(BufferAllocator allocator) {
+    Preconditions.checkArgument(allocator != null, "buffer allocator cannot be null");
+    this.ownerAllocator = allocator;
+  }
+
 }
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
index 026fc81..fa74677 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
@@ -19,13 +19,25 @@ package org.apache.drill.exec.work.batch;
 
 import java.io.IOException;
 
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.RawFragmentBatch;
 
-interface DataCollector extends AutoCloseable {
+public interface DataCollector extends AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataCollector.class);
   public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) throws IOException ;
   public int getOppositeMajorFragmentId();
   public RawBatchBuffer[] getBuffers();
   public int getTotalIncomingFragments();
   public void close() throws Exception;
+  /**
+   * Enables caller (e.g., receiver) to attach its buffer allocator to this Data Collector in order
+   * to claim ownership of incoming batches; by default, the fragment allocator owns these batches.
+   *
+   * @param allocator operator buffer allocator
+   */
+  void setAllocator(BufferAllocator allocator);
+  /**
+   * @return allocator
+   */
+  BufferAllocator getAllocator();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index 876c8b5..2d1b4f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.concurrent.AutoCloseableLock;
 import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.Collector;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -103,8 +104,11 @@ public class IncomingBuffers implements AutoCloseable {
             Arrays.toString(collectorMap.values().toArray())));
       }
 
+      // Use the Data Collector's buffer allocator if set, otherwise the fragment's one
+      BufferAllocator ownerAllocator = collector.getAllocator();
+
       synchronized (collector) {
-        final RawFragmentBatch newRawFragmentBatch = incomingBatch.newRawFragmentBatch(context.getAllocator());
+        final RawFragmentBatch newRawFragmentBatch = incomingBatch.newRawFragmentBatch(ownerAllocator);
         boolean decrementedToZero = collector
             .batchArrived(incomingBatch.getHeader().getSendingMinorFragmentId(), newRawFragmentBatch);
         newRawFragmentBatch.release();
@@ -125,8 +129,8 @@ public class IncomingBuffers implements AutoCloseable {
     return rem;
   }
 
-  public RawBatchBuffer[] getBuffers(int senderMajorFragmentId) {
-    return collectorMap.get(senderMajorFragmentId).getBuffers();
+  public DataCollector getCollector(int senderMajorFragmentId) {
+    return collectorMap.get(senderMajorFragmentId);
   }
 
   public boolean isDone() {

-- 
To stop receiving notification emails like this one, please contact
arina@apache.org.

[drill] 02/04: DRILL-6418: Handle Schema change in Unnest And Lateral for unnest field / non-unnest field

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit b7d259ba9c8c2b28700c9da33bb97dd79ef04cbc
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Tue May 15 14:27:31 2018 -0700

    DRILL-6418: Handle Schema change in Unnest And Lateral for unnest field / non-unnest field
    
    Note: Changed Lateral to handle non-empty right batch with OK_NEW_SCHEMA
    
    closes #1271
---
 .../exec/physical/impl/join/LateralJoinBatch.java  |  45 +---
 .../physical/impl/project/ProjectRecordBatch.java  |   2 +-
 .../physical/impl/unnest/UnnestRecordBatch.java    |   6 +-
 .../org/apache/drill/exec/record/BatchSchema.java  |  27 ++-
 .../apache/drill/exec/record/VectorContainer.java  |   9 +
 .../impl/join/TestLateralJoinCorrectness.java      | 252 ++++++++++++++++++++-
 .../impl/limit/TestLimitBatchEmitOutcome.java      |   3 +
 .../impl/project/TestProjectEmitOutcome.java       |   3 +
 .../unnest/TestUnnestWithLateralCorrectness.java   |   8 +-
 .../drill/exec/record/MaterializedField.java       |  43 +++-
 .../exec/vector/complex/AbstractMapVector.java     |   3 +
 11 files changed, 347 insertions(+), 54 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 8ea381b..a09913f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -125,8 +125,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     // Left side has some records in the batch so let's process right batch
     childOutcome = processRightBatch();
 
-    // reset the left & right outcomes to OK here and send the empty batch downstream
-    // Assumption being right side will always send OK_NEW_SCHEMA with empty batch which is what UNNEST will do
+    // reset the left & right outcomes to OK here and send the empty batch downstream. Non-Empty right batch with
+    // OK_NEW_SCHEMA will be handled in subsequent next call
     if (childOutcome == OK_NEW_SCHEMA) {
       leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
       rightUpstream = OK;
@@ -344,22 +344,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
       switch (leftUpstream) {
         case OK_NEW_SCHEMA:
           // This OK_NEW_SCHEMA is received post build schema phase and from left side
-          // If schema didn't actually changed then just handle it as OK outcome. This is fine since it is not setting
-          // up any incoming vector references in setupNewSchema. While copying the records it always work on latest
-          // incoming vector.
-          if (!isSchemaChanged(left.getSchema(), leftSchema)) {
-            logger.warn(String.format("New schema received from left side is same as previous known left schema. " +
-              "Ignoring this schema change. Old Left Schema: %s, New Left Schema: %s", leftSchema, left.getSchema()));
-
-            // Current left batch is empty and schema didn't changed as well, so let's get next batch and loose
-            // OK_NEW_SCHEMA outcome
-            processLeftBatchInFuture = false;
-            if (emptyLeftBatch) {
-              continue;
-            } else {
-              leftUpstream = OK;
-            }
-          } else if (outputIndex > 0) { // can only reach here from produceOutputBatch
+          if (outputIndex > 0) { // can only reach here from produceOutputBatch
             // This means there is already some records from previous join inside left batch
             // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call
             processLeftBatchInFuture = true;
@@ -439,20 +424,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
           // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a
           // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through
           //
-          // Right batch with OK_NEW_SCHEMA is always going to be an empty batch, so let's pass the new schema
-          // downstream and later with subsequent next() call the join output will be produced
-          Preconditions.checkState(right.getRecordCount() == 0,
-            "Right side batch with OK_NEW_SCHEMA is not empty");
-
-          if (!isSchemaChanged(right.getSchema(), rightSchema)) {
-            logger.warn(String.format("New schema received from right side is same as previous known right schema. " +
-              "Ignoring this schema change. Old Right schema: %s, New Right Schema: %s",
-              rightSchema, right.getSchema()));
-            continue;
-          }
+          // Right batch with OK_NEW_SCHEMA can be non-empty so update the rightJoinIndex correctly and pass the
+          // new schema downstream with empty batch and later with subsequent next() call the join output will be
+          // produced
           if (handleSchemaChange()) {
             container.setRecordCount(0);
-            rightJoinIndex = -1;
+            rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
             return OK_NEW_SCHEMA;
           } else {
             return STOP;
@@ -637,10 +614,10 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
     batchMemoryManager.updateOutgoingStats(outputIndex);
-    if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
-      logger.debug("Number of records emitted: " + outputIndex);
-    }
+
+    logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+    logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", outputIndex,
+      container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation());
 
     // Update the output index for next output batch to zero
     outputIndex = 0;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index eab9007..8a88db9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -526,7 +526,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   @Override
   protected boolean setupNewSchema() throws SchemaChangeException {
     setupNewSchemaFromInput(this.incoming);
-    if (container.isSchemaChanged()) {
+    if (container.isSchemaChanged() || callBack.getSchemaChangedAndReset()) {
       container.buildSchema(SelectionVectorMode.NONE);
       return true;
     } else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index fe91fc3..ed5d91c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -389,11 +389,15 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     final MaterializedField thisField = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]);
     final MaterializedField prevField = unnestFieldMetadata;
     Preconditions.checkNotNull(thisField);
-    unnestFieldMetadata = thisField;
+
     // isEquivalent may return false if the order of the fields has changed. This usually does not
     // happen but if it does we end up throwing a spurious schema change exeption
     if (prevField == null || !prevField.isEquivalent(thisField)) {
       logger.debug("Schema changed");
+      // We should store the clone of MaterializedField for unnest column instead of reference. When the column is of
+      // type Map and there is change in any children field of the Map then that will update the reference variable and
+      // isEquivalent check will still return true.
+      unnestFieldMetadata = thisField.clone();
       return true;
     }
     return false;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 67598e0..f161234 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -100,8 +100,29 @@ public class BatchSchema implements Iterable<MaterializedField> {
     return result;
   }
 
-  // DRILL-5525: the semantics of this method are badly broken.
-  // Caveat emptor.
+  /**
+   * DRILL-5525: the semantics of this method are badly broken.
+   * Caveat emptor.
+   *
+   * This check used for detecting actual schema change inside operator record batch will not work for
+   * AbstractContainerVectors (like MapVector). In each record batch a reference to incoming batch schema is
+   * stored (let say S:{a: int}) and then equals is called on that stored reference and current incoming batch schema.
+   * Internally schema object has references to Materialized fields from vectors in container. If there is change in
+   * incoming batch schema, then the upstream will create a new ValueVector in its output container with the new
+   * detected type, which in turn will have new instance for Materialized Field. Then later a new BatchSchema object
+   * is created for this new incoming batch (let say S":{a":varchar}). The operator calling equals will have reference
+   * to old schema object (S) and hence first check will not be satisfied and then it will call equals on each of the
+   * Materialized Field (a.equals(a")). Since new materialized field is created for newly created vector the equals
+   * check on field will return false. And schema change will be detected in this case.
+   * Now consider instead of int vector there is a MapVector such that initial schema was (let say S:{a:{b:int, c:int}}
+   * and then later schema for Map field c changes, then in container Map vector will be found but later the children
+   * vector for field c will be replaced. This new schema object will be created as (S":{a:{b:int, c":varchar}}). Now
+   * when S.equals(S") is called it will eventually call a.equals(a) which will return true even though the schema of
+   * children value vector c has changed. This is because no new vector is created for field (a) and hence it's object
+   * reference to MaterializedField has not changed which will be reflected in both old and new schema instances.
+   * Hence we should make use of {@link BatchSchema#isEquivalent(BatchSchema)} method instead since
+   * {@link MaterializedField#isEquivalent(MaterializedField)} method is updated to remove the reference check.
+   */
 
   @Override
   public boolean equals(Object obj) {
@@ -151,7 +172,7 @@ public class BatchSchema implements Iterable<MaterializedField> {
 
   /**
    * Compare that two schemas are identical according to the rules defined
-   * in {@ link MaterializedField#isEquivalent(MaterializedField)}. In particular,
+   * in {@link MaterializedField#isEquivalent(MaterializedField)}. In particular,
    * this method requires that the fields have a 1:1 ordered correspondence
    * in the two schemas.
    *
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index e35bb5f..0ea23f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -135,6 +135,15 @@ public class VectorContainer implements VectorAccessible {
     return addOrGet(field, null);
   }
 
+  /**
+   * This method should be called with MaterializedField which also has correct children field list specially when
+   * the field type is MAP. Otherwise after calling this method if caller is not creating TransferPair on the
+   * ValueVector, then the new ValueVector will not have information about it's list of children MaterializedField.
+   * @param field
+   * @param callBack
+   * @param <T>
+   * @return
+   */
   @SuppressWarnings("unchecked")
   public <T extends ValueVector> T addOrGet(final MaterializedField field, final SchemaChangeCallBack callBack) {
     final TypedFieldId id = getValueVectorId(SchemaPath.getSimplePath(field.getName()));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 51df6e4..e9e9aac 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 @Category(OperatorTest.class)
@@ -853,8 +854,8 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
   }
 
   /**
-   * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL detects that
-   * correctly and suppresses schema change operation by producing output in same batch created with initial schema.
+   * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL rebuilds the
+   * schema each time and sends output in multiple output batches
    * The schema change was only for columns which are not produced by the UNNEST or right branch.
    *
    * @throws Exception
@@ -904,6 +905,8 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
       assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
       assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
       totalRecordCount += ljBatch.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      totalRecordCount += ljBatch.getRecordCount();
       assertTrue(totalRecordCount ==
         (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
           leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
@@ -922,9 +925,8 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
   }
 
   /**
-   * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL detects that
-   * correctly and suppresses false schema change indication from both left and right branch. It produces output in
-   * same batch created with initial schema.
+   * When multiple left batch is received with same schema but with OK_NEW_SCHEMA, then LATERAL correctly
+   * handles it by re-creating the schema and producing multiple batches of final output
    * The schema change is for columns common on both left and right side.
    *
    * @throws Exception
@@ -976,6 +978,9 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
       assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
       assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
       totalRecordCount += ljBatch.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      totalRecordCount += ljBatch.getRecordCount();
       assertTrue(totalRecordCount ==
         (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
           leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
@@ -2560,4 +2565,241 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
       rightMockBatch.close();
     }
   }
+
+  /**
+   * Verifies that if a non-empty batch with OK_NEW_SCHEMA is received from right side post buildSchema phase then it
+   * is handled correctly by sending an empty batch with OK_NEW_SCHEMA and later consuming it to produce actual
+   * output batch with some data
+   */
+  @Test
+  public void testPostBuildSchema_OK_NEW_SCHEMA_NonEmptyRightBatch() throws Exception {
+    // Create left input schema 2
+    TupleMetadata leftSchema2 = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.VARCHAR)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    // Create right input schema
+    TupleMetadata rightSchema2 = new SchemaBuilder()
+      .add("id_right", TypeProtos.MinorType.INT)
+      .add("cost_right", TypeProtos.MinorType.VARCHAR)
+      .add("name_right", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+
+    // Create data for left input
+    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema2)
+      .addRow(2, "20", "item20")
+      .build();
+
+    // Create data for right input
+    final RowSet.SingleRowSet emptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
+      .addRow(4, "41", "item41")
+      .addRow(5, "51", "item51")
+      .build();
+
+    // Get the left container with dummy data for Lateral Join
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(leftRowSet2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    // first OK_NEW_SCHEMA batch
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container()); // non-empty OK_NEW_SCHEMA batch
+    rightContainer.add(emptyRightRowSet2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      int totalRecordCount = 0;
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      totalRecordCount += ljBatch.getRecordCount();
+      // This means 2 output record batches were received because of Schema change
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertEquals(0, ljBatch.getRecordCount());
+      totalRecordCount += ljBatch.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      totalRecordCount += ljBatch.getRecordCount();
+      assertTrue(totalRecordCount ==
+        (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount() +
+          leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } catch (AssertionError | Exception error) {
+      fail();
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      leftRowSet2.clear();
+      emptyRightRowSet2.clear();
+      nonEmptyRightRowSet2.clear();
+    }
+  }
+
+  /**
+   * Test to verify in case of Multilevel lateral when a non-empty OK_NEW_SCHEMA batch post build schema phase is
+   * received from right most UNNEST of lower LATERAL then pipeline works fine.
+   * @throws Exception
+   */
+  @Test
+  public void testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() throws Exception {
+    // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 **
+
+    // Create left input schema for first batch
+    TupleMetadata leftSchema2 = new SchemaBuilder()
+      .add("id_left_new", TypeProtos.MinorType.INT)
+      .add("cost_left_new", TypeProtos.MinorType.INT)
+      .add("name_left_new", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2).build();
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2)
+      .addRow(6, 60, "item6")
+      .build();
+
+    leftContainer.add(emptyLeftRowSet.container());
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(emptyLeftRowSet_leftSchema2.container());
+    leftContainer.add(nonEmptyLeftRowSet_leftSchema2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch leftMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    TupleMetadata rightSchema2 = new SchemaBuilder()
+      .add("id_right_new", TypeProtos.MinorType.INT)
+      .add("cost_right_new", TypeProtos.MinorType.VARCHAR)
+      .add("name_right_new", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2).build();
+    final RowSet.SingleRowSet nonEmptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2)
+      .addRow(5, "51", "item51")
+      .addRow(6, "61", "item61")
+      .addRow(7, "71", "item71")
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet_rightSchema2.container()); // non-empty batch with Ok_new_schema
+    rightContainer.add(emptyRightRowSet_rightSchema2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+
+    final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
+      leftMockBatch_1, rightMockBatch_1);
+
+    // ** Prepare second pair of left and right batch for upper level Lateral_2 **
+
+    // Create left input schema for first batch
+    TupleMetadata leftSchema3 = new SchemaBuilder()
+      .add("id_left_left", TypeProtos.MinorType.INT)
+      .add("cost_left_left", TypeProtos.MinorType.INT)
+      .add("name_left_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3).build();
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3)
+      .addRow(6, 60, "item6")
+      .build();
+
+    // Get left input schema for second left batch
+    TupleMetadata leftSchema4 = new SchemaBuilder()
+      .add("id_left_left_new", TypeProtos.MinorType.INT)
+      .add("cost_left_left_new", TypeProtos.MinorType.VARCHAR)
+      .add("name_left_left_new", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema4 = fixture.rowSetBuilder(leftSchema4)
+      .addRow(100, "100", "item100")
+      .build();
+
+    // Build Left container for upper level LATERAL operator
+    final List<VectorContainer> leftContainer2 = new ArrayList<>(5);
+
+    // Get the left container with dummy data
+    leftContainer2.add(emptyLeftRowSet_leftSchema3.container());
+    leftContainer2.add(nonEmptyLeftRowSet_leftSchema3.container());
+    leftContainer2.add(nonEmptyLeftRowSet_leftSchema4.container());
+
+    // Get the left container outcomes for upper level LATERAL operator
+    final List<RecordBatch.IterOutcome> leftOutcomes2 = new ArrayList<>(5);
+    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes2.add(RecordBatch.IterOutcome.OK);
+    leftOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    final CloseableRecordBatch leftMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer2, leftOutcomes2, leftContainer2.get(0).getSchema());
+
+    final LateralJoinBatch upperLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
+      leftMockBatch_2, lowerLevelLateral);
+
+    try {
+      // 3 for first batch on left side and another 3 for next left batch
+      final int expectedOutputRecordCount = 6;
+      int actualOutputRecordCount = 0;
+
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
+      assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
+      actualOutputRecordCount += upperLevelLateral.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
+      actualOutputRecordCount += upperLevelLateral.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == upperLevelLateral.next());
+      actualOutputRecordCount += upperLevelLateral.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.OK == upperLevelLateral.next());
+      actualOutputRecordCount += upperLevelLateral.getRecordCount();
+      assertTrue(RecordBatch.IterOutcome.NONE == upperLevelLateral.next());
+      assertTrue(actualOutputRecordCount == expectedOutputRecordCount);
+    } catch (AssertionError | Exception error) {
+      fail();
+    } finally {
+      // Close all the resources for this test case
+      upperLevelLateral.close();
+      leftMockBatch_2.close();
+      lowerLevelLateral.close();
+      leftMockBatch_1.close();
+      rightMockBatch_1.close();
+      leftContainer2.clear();
+      leftOutcomes2.clear();
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
index 4757488..38c0f51 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitBatchEmitOutcome.java
@@ -17,16 +17,19 @@
  */
 package org.apache.drill.exec.physical.impl.limit;
 
+import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.exec.physical.config.Limit;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.test.rowSet.RowSet;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+@Category(OperatorTest.class)
 public class TestLimitBatchEmitOutcome extends BaseTestOpBatchEmitOutcome {
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
index b3099d0..cc737e9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestProjectEmitOutcome.java
@@ -17,17 +17,20 @@
  */
 package org.apache.drill.exec.physical.impl.project;
 
+import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.test.rowSet.RowSet;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static junit.framework.TestCase.fail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+@Category(OperatorTest.class)
 public class TestProjectEmitOutcome extends BaseTestOpBatchEmitOutcome {
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index ec043b2..70a32f8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -223,8 +223,12 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
 
     Object[][][] baseline = {
         {
-          {1, 1, 2, 2, 2, 3, 3, 4},
-          {"0", "1", "2", "3", "4", "5", "6", "9"}
+          {1, 1, 2, 2, 2, 3, 3},
+          {"0", "1", "2", "3", "4", "5", "6"}
+        },
+        {
+          {4},
+          {"9"}
         }
     };
 
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index fa4d276..672bb7e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -111,7 +111,7 @@ public class MaterializedField {
    * <p>
    * By allowing the non-critical metadata to change, we preserve the
    * child relationships as a list or union evolves.
-   * @param type
+   * @param newType
    */
 
   public void replaceType(MajorType newType) {
@@ -190,11 +190,20 @@ public class MaterializedField {
     return Objects.hash(this.name, this.type, this.children);
   }
 
+  /**
+   * Equals method doesn't check for the children list of fields here. When a batch is sent over network then it is
+   * serialized along with the Materialized Field which also contains information about the internal vectors like
+   * offset and bits. While deserializing, these vectors are treated as children of parent vector. If a operator on
+   * receiver side like Sort receives a schema in buildSchema phase and then later on receives another batch, that
+   * will result in schema change and query will fail. This is because second batch schema will contain information
+   * about internal vectors like offset and bits which will not be present in first batch schema. For ref: See
+   * TestSort#testSortWithRepeatedMapWithExchanges
+   *
+   * @param obj
+   * @return
+   */
   @Override
   public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
     if (obj == null) {
       return false;
     }
@@ -206,7 +215,7 @@ public class MaterializedField {
     // in MapVector$MapTransferPair
 
     return this.name.equalsIgnoreCase(other.name) &&
-            Objects.equals(this.type, other.type);
+      Objects.equals(this.type, other.type);
   }
 
   /**
@@ -230,6 +239,27 @@ public class MaterializedField {
    * sense.) Operators that want to reconcile two maps that differ only in
    * column order need a different comparison.</li>
    * </ul>
+   * <ul>
+   * Note: Materialized Field and ValueVector has 1:1 mapping which means for each ValueVector there is a materialized
+   * field associated with it. So when we replace or add a ValueVector in a VectorContainer then we create new
+   * Materialized Field object for the new vector. This works fine for Primitive type ValueVectors but for ValueVector
+   * which are of type {@link org.apache.drill.exec.vector.complex.AbstractContainerVector} there is some differences on
+   * how Materialized field and ValueVector objects are updated inside the container which both ValueVector and
+   * Materialized Field object both mutable.
+   * <p>
+   * For example: For cases of MapVector it can so happen that only the children field type changed but
+   * the parent Map type and name remained same. In these cases we replace the children field ValueVector from parent
+   * MapVector inside main batch container, with new type of vector. Thus the reference of parent MaprVector inside
+   * batch container remains same but the reference of children field ValueVector stored inside MapVector get's updated.
+   * During this update it also replaces the Materialized field for that children field which is stored in childrens
+   * list of the parent MapVector Materialized Field.
+   * Since the children list of parent Materialized Field is updated, this make this class mutable. Hence there should
+   * not be any check for object reference equality here but instead there should be deep comparison which is what
+   * this method is now performing. Since if we have object reference check then in above cases it will return true for
+   * 2 Materialized Field object whose children field list is different which is not correct. Same holds true for
+   * {@link MaterializedField#isEquivalent(MaterializedField)} method.
+   * </p>
+   * </ul>
    *
    * @param other another field
    * @return <tt>true</tt> if the columns are identical according to the
@@ -237,9 +267,6 @@ public class MaterializedField {
    */
 
   public boolean isEquivalent(MaterializedField other) {
-    if (this == other) {
-      return true;
-    }
     if (! name.equalsIgnoreCase(other.name)) {
       return false;
     }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
index 3682397..1d0e03b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -129,6 +129,9 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
       return (T) existing;
     } else if (nullFilled(existing)) {
       existing.clear();
+      // Since it's removing old vector and adding new one based on new type, it should do same for Materialized field,
+      // Otherwise there will be duplicate of same field with same name but different type.
+      field.removeChild(existing.getField());
       create = true;
     }
     if (create) {

-- 
To stop receiving notification emails like this one, please contact
arina@apache.org.

[drill] 04/04: DRILL-6423: Export query result as a CSV file

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 82e1a1229203efc3f8899c620a7efc60dff6d388
Author: Kunal Khatua <ku...@apache.org>
AuthorDate: Wed May 16 10:35:51 2018 -0700

    DRILL-6423: Export query result as a CSV file
    
    Added option for user specified delimiter
    Also, we show query Id on results page as alink for popping out a new window
    
    closes #1266
---
 .../drill/exec/server/rest/QueryResources.java     |  6 ++
 .../drill/exec/server/rest/QueryWrapper.java       | 12 +++-
 .../src/main/resources/rest/query/result.ftl       | 71 +++++++++++++++++++++-
 3 files changed, 85 insertions(+), 4 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
index c46c4b5..e69b261 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryResources.java
@@ -96,8 +96,10 @@ public class QueryResources {
   public static class TabularResult {
     private final List<String> columns;
     private final List<List<String>> rows;
+    private final String queryId;
 
     public TabularResult(QueryResult result) {
+      queryId = result.getQueryId();
       final List<List<String>> rows = Lists.newArrayList();
       for (Map<String, String> rowMap:result.rows) {
         final List<String> row = Lists.newArrayList();
@@ -115,6 +117,10 @@ public class QueryResources {
       return columns.isEmpty();
     }
 
+    public String getQueryId() {
+      return queryId;
+    }
+
     public List<String> getColumns() {
       return columns;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
index 6a92942..911ac0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.proto.UserProtos.QueryResultsMode;
 import org.apache.drill.exec.work.WorkManager;
 
@@ -79,18 +80,23 @@ public class QueryWrapper {
     }
 
     // Return the QueryResult.
-    return new QueryResult(webUserConnection.columns, webUserConnection.results);
+    return new QueryResult(queryId, webUserConnection.columns, webUserConnection.results);
   }
 
   public static class QueryResult {
+    private final String queryId;
     public final Collection<String> columns;
-
     public final List<Map<String, String>> rows;
 
-    public QueryResult(Collection<String> columns, List<Map<String, String>> rows) {
+    public QueryResult(QueryId queryId, Collection<String> columns, List<Map<String, String>> rows) {
+      this.queryId = QueryIdHelper.getQueryId(queryId);
       this.columns = columns;
       this.rows = rows;
     }
+
+    public String getQueryId() {
+      return queryId;
+    }
   }
 
   @Override
diff --git a/exec/java-exec/src/main/resources/rest/query/result.ftl b/exec/java-exec/src/main/resources/rest/query/result.ftl
index 37eeca8..96e68b0 100644
--- a/exec/java-exec/src/main/resources/rest/query/result.ftl
+++ b/exec/java-exec/src/main/resources/rest/query/result.ftl
@@ -30,6 +30,22 @@
   <a href="/queries">back</a><br/>
   <div class="page-header">
   </div>
+  <div>
+  <table><tr>
+    <td align='left'>
+      <button type="button"  title="Open in new window" onclick="popOutProfile('${model.getQueryId()}');" class="btn btn-default btn-sm">
+      <b>Query Profile:</b> ${model.getQueryId()} <span class="glyphicon glyphicon-new-window"/></button>
+     </td><td align="right" width="100%">
+       <div class="input-group">
+         <span class="input-group-addon" style="font-size:95%">Delimiter </span>
+         <input id="delimitBy" type="text" class="form-control input-sm" name="delimitBy" title="Specify delimiter" placeholder="Required" maxlength="2" size="2" value=",">
+       </div></td><td>
+       <button type="button"  title="Export visible table as CSV. Show ALL rows to export entire resultSet" onclick="exportTableAsCsv('${model.getQueryId()}');" class="btn btn-default btn-sm">
+       <b>Export </b> <span class="glyphicon glyphicon-export"/></button>
+     </td>
+  </tr>
+  </table>
+  </div>
   <#if model.isEmpty()>
     <div class="jumbotron">
       <p class="lead">No result found.</p>
@@ -59,11 +75,64 @@
       $('#result').dataTable( {
         "aaSorting": [],
         "scrollX" : true,
+        "lengthMenu": [[10, 25, 50, 100, -1], [10, 25, 50, 100, "All"]],
+        "lengthChange": true,
         "dom": '<"H"lCfr>t<"F"ip>',
         "jQueryUI" : true
       } );
     } );
-  </script>
+
+    //Pop out profile (needed to avoid losing query results)
+    function popOutProfile(queryId) {
+      var profileUrl = location.protocol+'//'+ location.host+'/profiles/'+queryId;
+      var tgtWindow = '_blank';
+      window.open(profileUrl, tgtWindow);
+    }
+
+    //Ref: https://jsfiddle.net/gengns/j1jm2tjx/
+    function downloadCsv(csvRecords, filename) {
+      var csvFile;
+      var downloadElem;
+
+      //CSV File
+      csvFile = new Blob([csvRecords], {type: "text/csv"});
+      // Download link
+      downloadElem = document.createElement("a");
+      // File name
+      downloadElem.download = filename;
+
+      // We have to create a link to the file
+      downloadElem.href = window.URL.createObjectURL(csvFile);
+
+      // Make sure that the link is not displayed
+      downloadElem.style.display = "none";
+
+      // Add the link to your DOM
+      document.body.appendChild(downloadElem);
+
+      // Launch the download prompt
+      downloadElem.click();
+    }
+
+    function exportTableAsCsv(queryId) {
+      var filename = queryId + '.csv';
+      var csv = []; //Array of records
+      var rows = document.getElementById('result').querySelectorAll("tr");
+      var delimiter = document.getElementById('delimitBy').value;
+      if (delimiter == 'undefined' || delimiter.length==0) {
+        delimiter = ",";
+      }
+      for (var i = 0; i < rows.length; i++) {
+        var row = [], cols = rows[i].querySelectorAll("th, td");
+        for (var j = 0; j < cols.length; j++)
+          row.push(cols[j].textContent);
+          csv.push(row.join(delimiter));
+        }
+        // Download CSV
+        downloadCsv(csv.join("\n"), filename);
+    }
+
+    </script>
 </#macro>
 
 <@page_html/>

-- 
To stop receiving notification emails like this one, please contact
arina@apache.org.