You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/08/14 23:00:20 UTC

[GitHub] sohami closed pull request #1431: DRILL-6687: Improve RemovingRecordBatch to do transfer when all recor…

sohami closed pull request #1431: DRILL-6687: Improve RemovingRecordBatch to do transfer when all recor…
URL: https://github.com/apache/drill/pull/1431
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 4fc0d1596a5..2763f5903c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -47,7 +47,7 @@
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
 import org.apache.drill.exec.physical.impl.svremover.Copier;
-import org.apache.drill.exec.physical.impl.svremover.GenericSV4Copier;
+import org.apache.drill.exec.physical.impl.svremover.GenericCopierFactory;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -360,7 +360,7 @@ private void purge() throws SchemaChangeException {
     SelectionVector4 selectionVector4 = priorityQueue.getSv4();
     SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
     if (copier == null) {
-      copier = GenericSV4Copier.createCopier(batch, newContainer, null);
+      copier = GenericCopierFactory.createAndSetupCopier(batch, newContainer, null);
     } else {
       for (VectorWrapper<?> i : batch) {
 
@@ -468,7 +468,7 @@ public void purgeAndResetPriorityQueue() throws SchemaChangeException, ClassTran
     @SuppressWarnings("resource")
     final SelectionVector4 selectionVector4 = priorityQueue.getSv4();
     final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
-    copier = GenericSV4Copier.createCopier(batch, newContainer, null);
+    copier = GenericCopierFactory.createAndSetupCopier(batch, newContainer, null);
     @SuppressWarnings("resource")
     SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
     try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index 6d1f03462ab..7b0183bdab8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -67,11 +67,18 @@ public void filterBatch(int recordCount) throws SchemaChangeException{
     if (! outgoingSelectionVector.allocateNewSafe(recordCount)) {
       throw new OutOfMemoryException("Unable to allocate filter batch");
     }
+
     switch(svMode){
     case NONE:
+      // Set the actual recordCount in outgoing selection vector to help SVRemover copy the entire
+      // batch if possible at once rather than row-by-row
+      outgoingSelectionVector.setBatchActualRecordCount(recordCount);
       filterBatchNoSV(recordCount);
       break;
     case TWO_BYTE:
+      // Set the actual recordCount in outgoing selection vector to help SVRemover copy the entire
+      // batch if possible at once rather than row-by-row
+      outgoingSelectionVector.setBatchActualRecordCount(incomingSelectionVector.getBatchActualRecordCount());
       filterBatchSV2(recordCount);
       break;
     default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 06f0fdbee0d..a86271483ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -194,6 +194,7 @@ protected IterOutcome doWork() {
 
     // clear memory for incoming sv (if any)
     if (incomingSv != null) {
+      outgoingSv.setBatchActualRecordCount(incomingSv.getBatchActualRecordCount());
       incomingSv.clear();
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
index ddea4684f2a..47ec1cb14b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
@@ -19,7 +19,6 @@
 
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -31,7 +30,7 @@
   protected VectorContainer outgoing;
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException {
+  public void setup(RecordBatch incoming, VectorContainer outgoing) {
     this.outgoing = outgoing;
 
     final int count = outgoing.getNumberOfColumns();
@@ -43,7 +42,7 @@ public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaC
   }
 
   @Override
-  public int copyRecords(int index, int recordCount) throws SchemaChangeException {
+  public int copyRecords(int index, int recordCount) {
     for(VectorWrapper<?> out : outgoing){
       TypeProtos.MajorType type = out.getField().getType();
       if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
@@ -57,7 +56,7 @@ public int copyRecords(int index, int recordCount) throws SchemaChangeException
   }
 
   @Override
-  public int appendRecord(int index) throws SchemaChangeException {
+  public int appendRecord(int index) {
     int outgoingPosition = outgoing.getRecordCount();
     copyEntryIndirect(index, outgoingPosition);
     outgoingPosition++;
@@ -66,11 +65,11 @@ public int appendRecord(int index) throws SchemaChangeException {
   }
 
   @Override
-  public int appendRecords(int index, int recordCount) throws SchemaChangeException {
+  public int appendRecords(int index, int recordCount) {
     return insertRecords(outgoing.getRecordCount(), index, recordCount);
   }
 
-  private int insertRecords(int outgoingPosition, int index, int recordCount) throws SchemaChangeException {
+  private int insertRecords(int outgoingPosition, int index, int recordCount) {
     final int endIndex = index + recordCount;
 
     for(int svIndex = index; svIndex < endIndex; svIndex++, outgoingPosition++){
@@ -81,7 +80,7 @@ private int insertRecords(int outgoingPosition, int index, int recordCount) thro
     return outgoingPosition;
   }
 
-  private void updateCounts(int numRecords) {
+  protected void updateCounts(int numRecords) {
     outgoing.setRecordCount(numRecords);
 
     for (int vectorIndex = 0; vectorIndex < vvOut.length; vectorIndex++) {
@@ -89,7 +88,7 @@ private void updateCounts(int numRecords) {
     }
   }
 
-  public abstract void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException;
+  public abstract void copyEntryIndirect(int inIndex, int outIndex);
 
-  public abstract void copyEntry(int inIndex, int outIndex) throws SchemaChangeException;
+  public abstract void copyEntry(int inIndex, int outIndex);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
index 321d9a87d69..68a088917b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
@@ -17,19 +17,23 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.vector.ValueVector;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public abstract class AbstractSV2Copier extends AbstractCopier {
   protected ValueVector[] vvIn;
   private SelectionVector2 sv2;
+  protected List<TransferPair> transferPairs = new ArrayList<>();
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException {
+  public void setup(RecordBatch incoming, VectorContainer outgoing) {
     super.setup(incoming, outgoing);
     this.sv2 = incoming.getSelectionVector2();
 
@@ -46,7 +50,20 @@ public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaC
     }
   }
 
-  public void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException {
+  public void copyEntryIndirect(int inIndex, int outIndex) {
     copyEntry(sv2.getIndex(inIndex), outIndex);
   }
+
+  @Override
+  public int copyRecords(int index, int recordCount) {
+    if (sv2.canDoFullTransfer()) {
+      for (TransferPair pair : transferPairs) {
+        pair.transfer();
+      }
+      updateCounts(recordCount);
+      return recordCount;
+    }
+
+    return super.copyRecords(index, recordCount);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
index cd6af07b378..56e258659e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -31,7 +30,7 @@
   private SelectionVector4 sv4;
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException{
+  public void setup(RecordBatch incoming, VectorContainer outgoing) {
     super.setup(incoming, outgoing);
     this.sv4 = incoming.getSelectionVector4();
 
@@ -48,7 +47,7 @@ public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaC
     }
   }
 
-  public void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException {
+  public void copyEntryIndirect(int inIndex, int outIndex) {
     copyEntry(sv4.get(inIndex), outIndex);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
index bc31252827f..92dea7021d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
@@ -17,13 +17,12 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 
 public interface Copier {
-  void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException;
-  int copyRecords(int index, int recordCount) throws SchemaChangeException;
-  int appendRecord(int index) throws SchemaChangeException;
-  int appendRecords(int index, int recordCount) throws SchemaChangeException;
+  void setup(RecordBatch incoming, VectorContainer outgoing);
+  int copyRecords(int index, int recordCount);
+  int appendRecord(int index);
+  int appendRecords(int index, int recordCount);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
index de048dcb7e5..72516e058ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -30,7 +29,7 @@
   private VectorContainer outgoing;
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException {
+  public void setup(RecordBatch incoming, VectorContainer outgoing) {
     this.outgoing = outgoing;
 
     final int count = outgoing.getNumberOfColumns();
@@ -53,12 +52,12 @@ public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaC
   }
 
   @Override
-  public int copyRecords(int index, int recordCount) throws SchemaChangeException {
+  public int copyRecords(int index, int recordCount) {
     return insertRecords(0, index, recordCount);
   }
 
   @Override
-  public int appendRecord(int index) throws SchemaChangeException {
+  public int appendRecord(int index) {
     int outgoingPosition = outgoing.getRecordCount();
     for (int vectorIndex = 0; vectorIndex < vvIn.length; vectorIndex++) {
       vvOut[vectorIndex].copyEntry(outgoingPosition, vvIn[vectorIndex], index);
@@ -69,11 +68,11 @@ public int appendRecord(int index) throws SchemaChangeException {
   }
 
   @Override
-  public int appendRecords(int index, int recordCount) throws SchemaChangeException {
+  public int appendRecords(int index, int recordCount) {
     return insertRecords(outgoing.getRecordCount(), index, recordCount);
   }
 
-  private int insertRecords(int outgoingPosition, int startIndex, int recordCount) throws SchemaChangeException {
+  private int insertRecords(int outgoingPosition, int startIndex, int recordCount) {
     final int endIndex = startIndex + recordCount;
 
     for (int index = startIndex; index < endIndex; index++, outgoingPosition++) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierFactory.java
new file mode 100644
index 00000000000..cd6dd02c968
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
+
+public class GenericCopierFactory {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GenericCopierFactory.class);
+
+  public static Copier createAndSetupCopier(RecordBatch incoming, VectorContainer outputContainer,
+                                            SchemaChangeCallBack callBack) {
+    Copier copier;
+
+    switch(incoming.getSchema().getSelectionVectorMode()){
+      case NONE:
+        copier = new StraightCopier(incoming, outputContainer, callBack);
+        break;
+      case TWO_BYTE:
+        copier = new GenericSV2Copier(incoming, outputContainer, callBack);
+        break;
+      case FOUR_BYTE:
+        copier = new GenericSV4Copier(incoming, outputContainer, callBack);
+        break;
+      default:
+        throw new UnsupportedOperationException();
+    }
+
+    copier.setup(incoming, outputContainer);
+    return copier;
+  }
+
+  public static Copier createAndSetupNonSVGenericCopier(RecordBatch incoming, VectorContainer outputContainer) {
+    Copier copier = new GenericCopier();
+    copier.setup(incoming, outputContainer);
+    return copier;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java
index a375f451d02..f607e8c7d20 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java
@@ -17,11 +17,24 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
 
 public class GenericSV2Copier extends AbstractSV2Copier {
+
+  public GenericSV2Copier(RecordBatch incomingBatch, VectorContainer outputContainer,
+                          SchemaChangeCallBack callBack) {
+    for(VectorWrapper<?> vv : incomingBatch){
+      TransferPair pair = vv.getValueVector().makeTransferPair(outputContainer.addOrGet(vv.getField(), callBack));
+      transferPairs.add(pair);
+    }
+  }
+
   @Override
-  public void copyEntry(int inIndex, int outIndex) throws SchemaChangeException {
+  public void copyEntry(int inIndex, int outIndex) {
     for ( int i = 0;  i < vvIn.length;  i++ ) {
       vvOut[i].copyEntry(outIndex, vvIn[i], inIndex);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
index 1f3d28bdc4b..c676841f822 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -25,8 +24,18 @@
 import org.apache.drill.exec.vector.ValueVector;
 
 public class GenericSV4Copier extends AbstractSV4Copier {
+
+  public GenericSV4Copier(RecordBatch incomingBatch, VectorContainer outputContainer,
+                          SchemaChangeCallBack callBack) {
+    for(VectorWrapper<?> vv : incomingBatch){
+      @SuppressWarnings("resource")
+      ValueVector v = vv.getValueVectors()[0];
+      v.makeTransferPair(outputContainer.addOrGet(v.getField(), callBack));
+    }
+  }
+
   @Override
-  public void copyEntry(int inIndex, int outIndex) throws SchemaChangeException {
+  public void copyEntry(int inIndex, int outIndex) {
     int inOffset = inIndex & 0xFFFF;
     int inVector = inIndex >>> 16;
     for ( int i = 0;  i < vvIn.length;  i++ ) {
@@ -34,16 +43,4 @@ public void copyEntry(int inIndex, int outIndex) throws SchemaChangeException {
       vvOut[i].copyEntry(outIndex, vectorsFromIncoming[inVector], inOffset);
     }
   }
-
-  public static Copier createCopier(RecordBatch batch, VectorContainer container, SchemaChangeCallBack callBack) throws SchemaChangeException {
-    for(VectorWrapper<?> vv : batch){
-      @SuppressWarnings("resource")
-      ValueVector v = vv.getValueVectors()[0];
-      v.makeTransferPair(container.addOrGet(v.getField(), callBack));
-    }
-
-    Copier copier = new GenericSV4Copier();
-    copier.setup(batch, container);
-    return copier;
-  }
 }
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index acfdc878aa6..1471d5e558f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import java.util.List;
-
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -26,14 +24,9 @@
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
 public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVectorRemover>{
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class);
 
@@ -56,19 +49,7 @@ protected boolean setupNewSchema() throws SchemaChangeException {
     // send OK to downstream instead. Since the output of RemovingRecordBatch is always going to be a regular container
     // change in incoming container type is not actual schema change.
     container.zeroVectors();
-    switch(incoming.getSchema().getSelectionVectorMode()){
-    case NONE:
-      this.copier = getStraightCopier();
-      break;
-    case TWO_BYTE:
-      this.copier = create2Copier();
-      break;
-    case FOUR_BYTE:
-      this.copier = create4Copier();
-      break;
-    default:
-      throw new UnsupportedOperationException();
-    }
+    copier = GenericCopierFactory.createAndSetupCopier(incoming, container, callBack);
 
     // If there is an actual schema change then below condition will be true and it will send OK_NEW_SCHEMA
     // downstream too
@@ -84,7 +65,7 @@ protected boolean setupNewSchema() throws SchemaChangeException {
   protected IterOutcome doWork() {
     try {
       copier.copyRecords(0, incoming.getRecordCount());
-    } catch (SchemaChangeException e) {
+    } catch (Exception e) {
       throw new IllegalStateException(e);
     } finally {
       if (incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.FOUR_BYTE) {
@@ -107,63 +88,6 @@ public void close() {
     super.close();
   }
 
-  private class StraightCopier implements Copier{
-
-    private List<TransferPair> pairs = Lists.newArrayList();
-
-    @Override
-    public void setup(RecordBatch incoming, VectorContainer outgoing){
-      for(VectorWrapper<?> vv : incoming){
-        TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack));
-        pairs.add(tp);
-      }
-    }
-
-    @Override
-    public int copyRecords(int index, int recordCount) {
-      assert index == 0 && recordCount == incoming.getRecordCount() : "Straight copier cannot split batch";
-      for(TransferPair tp : pairs){
-        tp.transfer();
-      }
-
-      container.setRecordCount(incoming.getRecordCount());
-      return recordCount;
-    }
-
-    @Override
-    public int appendRecord(int index) throws SchemaChangeException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int appendRecords(int index, int recordCount) throws SchemaChangeException {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  private Copier getStraightCopier(){
-    StraightCopier copier = new StraightCopier();
-    copier.setup(incoming, container);
-    return copier;
-  }
-
-  private Copier create2Copier() throws SchemaChangeException {
-    Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
-
-    for(VectorWrapper<?> vv : incoming){
-      vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack));
-    }
-
-    Copier copier = new GenericSV2Copier();
-    copier.setup(incoming, container);
-    return copier;
-  }
-
-  private Copier create4Copier() throws SchemaChangeException {
-    Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
-    return GenericSV4Copier.createCopier(incoming, container, callBack);
-  }
-
   @Override
   public WritableBatch getWritableBatch() {
     return WritableBatch.get(this);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java
new file mode 100644
index 00000000000..33f2a964b98
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
+
+import java.util.List;
+
+public class StraightCopier implements Copier {
+    private List<TransferPair> pairs = Lists.newArrayList();
+    private RecordBatch incoming;
+    private VectorContainer outputContainer;
+    private SchemaChangeCallBack callBack;
+
+    public StraightCopier(RecordBatch incomingBatch, VectorContainer outputContainer, SchemaChangeCallBack callBack) {
+      this.incoming = incomingBatch;
+      this.outputContainer = outputContainer;
+      this.callBack = callBack;
+    }
+
+    @Override
+    public void setup(RecordBatch incoming, VectorContainer outgoing) {
+      for(VectorWrapper<?> vv : incoming){
+        TransferPair tp = vv.getValueVector().makeTransferPair(outputContainer.addOrGet(vv.getField(), callBack));
+        pairs.add(tp);
+      }
+    }
+
+    @Override
+    public int copyRecords(int index, int recordCount) {
+      assert index == 0 && recordCount == incoming.getRecordCount() : "Straight copier cannot split batch";
+      for(TransferPair tp : pairs){
+        tp.transfer();
+      }
+
+      outputContainer.setRecordCount(incoming.getRecordCount());
+      return recordCount;
+    }
+
+    @Override
+    public int appendRecord(int index) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int appendRecords(int index, int recordCount) {
+      throw new UnsupportedOperationException();
+    }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index 72441485874..8afc5fb0b37 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -36,7 +36,11 @@
 public class SelectionVector2 implements AutoCloseable {
 
   private final BufferAllocator allocator;
+  // Indicates number of indexes stored in the SV2 buffer which may be less than actual number of rows stored in
+  // RecordBatch container owning this SV2 instance
   private int recordCount;
+  // Indicates actual number of rows in the RecordBatch container which owns this SV2 instance
+  private int batchActualRecordCount = -1;
   private DrillBuf buffer = DeadBuf.DEAD_BUFFER;
 
   public static final int RECORD_SIZE = 2;
@@ -61,6 +65,11 @@ public SelectionVector2(BufferAllocator allocator, DrillBuf buf, int count) {
     recordCount = count;
   }
 
+  public SelectionVector2(BufferAllocator allocator, DrillBuf buf, int count, int actualRecordCount) {
+    this(allocator, buf, count);
+    this.batchActualRecordCount = actualRecordCount;
+  }
+
   public int getCount() {
     return recordCount;
   }
@@ -127,6 +136,7 @@ public void allocateNew(int size) {
   public SelectionVector2 clone() {
     SelectionVector2 newSV = new SelectionVector2(allocator);
     newSV.recordCount = recordCount;
+    newSV.batchActualRecordCount = batchActualRecordCount;
     newSV.buffer = buffer;
 
     /* Since buffer and newSV.buffer essentially point to the
@@ -143,6 +153,7 @@ public void clear() {
       buffer.release();
       buffer = DeadBuf.DEAD_BUFFER;
       recordCount = 0;
+      batchActualRecordCount = -1;
     }
   }
 
@@ -151,6 +162,18 @@ public void setRecordCount(int recordCount){
     this.recordCount = recordCount;
   }
 
+  public boolean canDoFullTransfer() {
+    return (recordCount == batchActualRecordCount);
+  }
+
+  public void setBatchActualRecordCount(int actualRecordCount) {
+    this.batchActualRecordCount = actualRecordCount;
+  }
+
+  public int getBatchActualRecordCount() {
+    return batchActualRecordCount;
+  }
+
   @Override
   public void close() {
     clear();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
index 01263b1d242..7d444b4ed50 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
@@ -17,34 +17,34 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import com.google.common.collect.Lists;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.RootAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBatch;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
 import org.junit.Test;
 
-import java.util.List;
-
 public abstract class AbstractGenericCopierTest {
   @Test
   public void testCopyRecords() throws SchemaChangeException {
     try (RootAllocator allocator = new RootAllocator(10_000_000)) {
-      final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+      final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
       final RowSet srcRowSet = createSrcRowSet(allocator);
       final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build();
       final VectorContainer destContainer = destRowSet.container();
-      final Copier copier = createCopier();
+      final Copier copier = createCopier(new RowSetBatch(srcRowSet), destContainer, null);
       final RowSet expectedRowSet = createExpectedRowset(allocator);
 
-      copier.setup(new RowSetBatch(srcRowSet), destContainer);
       copier.copyRecords(0, 3);
 
       try {
@@ -65,14 +65,13 @@ public void testCopyRecords() throws SchemaChangeException {
   @Test
   public void testAppendRecords() throws SchemaChangeException {
     try (RootAllocator allocator = new RootAllocator(10_000_000)) {
-      final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+      final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
       final RowSet srcRowSet = createSrcRowSet(allocator);
       final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build();
       final VectorContainer destContainer = destRowSet.container();
-      final Copier copier = createCopier();
+      final Copier copier = createCopier(new RowSetBatch(srcRowSet), destContainer, null);
       final RowSet expectedRowSet = createExpectedRowset(allocator);
 
-      copier.setup(new RowSetBatch(srcRowSet), destContainer);
       copier.appendRecord(0);
       copier.appendRecords(1, 2);
 
@@ -93,7 +92,10 @@ public void testAppendRecords() throws SchemaChangeException {
 
   public abstract RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException;
 
-  public abstract Copier createCopier();
+  public Copier createCopier(RecordBatch incoming, VectorContainer outputContainer,
+                                      SchemaChangeCallBack callback) {
+    return GenericCopierFactory.createAndSetupCopier(incoming, outputContainer, callback);
+  }
 
   public static Object[] row1() {
     return new Object[]{110, "green", new float[]{5.5f, 2.3f}, new String[]{"1a", "1b"}};
@@ -115,7 +117,7 @@ public void testAppendRecords() throws SchemaChangeException {
     return new Object[]{106, "black", new float[]{.75f}, new String[]{"4a"}};
   }
 
-  public static RowSet createExpectedRowset(RootAllocator allocator) {
+  private RowSet createExpectedRowset(RootAllocator allocator) {
     return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.NONE))
       .addRow(row1())
       .addRow(row2())
@@ -123,14 +125,17 @@ public static RowSet createExpectedRowset(RootAllocator allocator) {
       .build();
   }
 
-  public static BatchSchema createTestSchema(BatchSchema.SelectionVectorMode mode) {
+  protected TupleMetadata createTestSchema(BatchSchema.SelectionVectorMode mode) {
     MaterializedField colA = MaterializedField.create("colA", Types.required(TypeProtos.MinorType.INT));
     MaterializedField colB = MaterializedField.create("colB", Types.required(TypeProtos.MinorType.VARCHAR));
     MaterializedField colC = MaterializedField.create("colC", Types.repeated(TypeProtos.MinorType.FLOAT4));
     MaterializedField colD = MaterializedField.create("colD", Types.repeated(TypeProtos.MinorType.VARCHAR));
 
-    List<MaterializedField> cols = Lists.newArrayList(colA, colB, colC, colD);
-    BatchSchema batchSchema = new BatchSchema(mode, cols);
-    return batchSchema;
+    return new SchemaBuilder().add(colA)
+      .add(colB)
+      .add(colC)
+      .add(colD)
+      .withSVMode(mode)
+      .buildSchema();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
index f946f81f47a..d6c38e72482 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
@@ -19,6 +19,9 @@
 
 import org.apache.drill.exec.memory.RootAllocator;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 
@@ -35,7 +38,8 @@ public RowSet createSrcRowSet(RootAllocator allocator) {
   }
 
   @Override
-  public Copier createCopier() {
-    return new GenericCopier();
+  public Copier createCopier(RecordBatch incoming, VectorContainer outputContainer,
+                             SchemaChangeCallBack callback) {
+    return GenericCopierFactory.createAndSetupNonSVGenericCopier(incoming, outputContainer);
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java
new file mode 100644
index 00000000000..748e0d06b61
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetBuilder;
+
+/**
+ * Verifies optimization in SV2 such that when total record to copy is same as number of records in the
+ * underlying batch for SV2 then SV2 will do transfer rather than row by row copy
+ */
+public class GenericSV2BatchCopierTest extends AbstractGenericCopierTest {
+
+  @Override
+  public RowSet createSrcRowSet(RootAllocator allocator) {
+    return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE))
+      .addSelection(true, row1())
+      .addRow(row2())
+      .addSelection(true, row3())
+      .withSv2()
+      .build();
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
index 428124d3299..b2f0e5105bc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
@@ -21,7 +21,9 @@
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
+
 public class GenericSV2CopierTest extends AbstractGenericCopierTest {
+
   @Override
   public RowSet createSrcRowSet(RootAllocator allocator) {
     return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE))
@@ -33,9 +35,4 @@ public RowSet createSrcRowSet(RootAllocator allocator) {
       .withSv2()
       .build();
   }
-
-  @Override
-  public Copier createCopier() {
-    return new GenericSV2Copier();
-  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
index 447ad3ae9d0..a5f5bb7852c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
@@ -23,15 +23,17 @@
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.test.rowSet.HyperRowSetImpl;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 
 public class GenericSV4CopierTest extends AbstractGenericCopierTest {
+
   @Override
   public RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException {
-    final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+    final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
     final DrillBuf drillBuf = allocator.buffer(4 * 3);
     final SelectionVector4 sv4 = new SelectionVector4(drillBuf, 3, Character.MAX_VALUE);
 
@@ -57,9 +59,4 @@ public RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeExcept
 
     return new HyperRowSetImpl(hyperContainer, sv4);
   }
-
-  @Override
-  public Copier createCopier() {
-    return new GenericSV4Copier();
-  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
index f0ebdc073be..878aa25e611 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
@@ -96,6 +96,7 @@ private static SelectionVector2 makeSv2(BufferAllocator allocator, VectorContain
       destIndex++;
     }
     sv2.setRecordCount(rowCount);
+    sv2.setBatchActualRecordCount(container.getRecordCount());
     container.buildSchema(SelectionVectorMode.TWO_BYTE);
     return sv2;
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services