You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/08/14 23:00:21 UTC

[drill] branch master updated (6ad0f9f -> d58554f)

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from 6ad0f9f  DRILL-6453: Fix deadlock caused by reading from left and right inputs in HashJoin simultaneously.
     new 9725d4d  DRILL-6687: Improve RemovingRecordBatch to do transfer when all records needs to be copied Add optimization in SelectionVector2 to enable RemovingRecordBatch to transfer ValueVectors from incoming to output container when all records needs to be copied. Modified FilterRecordBatch and LimitRecordBatch to play by this optimization
     new d58554f  DRILL-6687: Updated with review comments

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../drill/exec/physical/impl/TopN/TopNBatch.java   |  6 +-
 .../exec/physical/impl/filter/FilterTemplate2.java |  7 ++
 .../exec/physical/impl/limit/LimitRecordBatch.java |  1 +
 .../physical/impl/svremover/AbstractCopier.java    | 17 +++--
 .../physical/impl/svremover/AbstractSV2Copier.java | 23 ++++++-
 .../physical/impl/svremover/AbstractSV4Copier.java |  5 +-
 .../drill/exec/physical/impl/svremover/Copier.java |  9 ++-
 .../physical/impl/svremover/GenericCopier.java     | 11 ++-
 .../impl/svremover/GenericCopierFactory.java       | 54 +++++++++++++++
 .../physical/impl/svremover/GenericSV2Copier.java  | 17 ++++-
 .../physical/impl/svremover/GenericSV4Copier.java  | 25 +++----
 .../impl/svremover/RemovingRecordBatch.java        | 80 +---------------------
 .../physical/impl/svremover/StraightCopier.java    | 69 +++++++++++++++++++
 .../exec/record/selection/SelectionVector2.java    | 23 +++++++
 .../impl/svremover/AbstractGenericCopierTest.java  | 35 ++++++----
 .../physical/impl/svremover/GenericCopierTest.java |  8 ++-
 ...ierTest.java => GenericSV2BatchCopierTest.java} | 19 +++--
 .../impl/svremover/GenericSV2CopierTest.java       |  7 +-
 .../impl/svremover/GenericSV4CopierTest.java       |  9 +--
 .../apache/drill/test/rowSet/IndirectRowSet.java   |  1 +
 20 files changed, 265 insertions(+), 161 deletions(-)
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierFactory.java
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java
 copy exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/{GenericSV2CopierTest.java => GenericSV2BatchCopierTest.java} (79%)


[drill] 02/02: DRILL-6687: Updated with review comments

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit d58554ff74c672312ae50e5ad26035f779c5a5d4
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Tue Aug 14 15:49:13 2018 -0700

    DRILL-6687: Updated with review comments
---
 .../apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java | 2 +-
 .../org/apache/drill/exec/record/selection/SelectionVector2.java     | 5 ++++-
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
index ec712e5..68a0889 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
@@ -56,7 +56,7 @@ public abstract class AbstractSV2Copier extends AbstractCopier {
 
   @Override
   public int copyRecords(int index, int recordCount) {
-    if (sv2.doFullTransfer()) {
+    if (sv2.canDoFullTransfer()) {
       for (TransferPair pair : transferPairs) {
         pair.transfer();
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index 1d9c5da..8afc5fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -36,7 +36,10 @@ import org.apache.drill.exec.record.DeadBuf;
 public class SelectionVector2 implements AutoCloseable {
 
   private final BufferAllocator allocator;
+  // Indicates number of indexes stored in the SV2 buffer which may be less than actual number of rows stored in
+  // RecordBatch container owning this SV2 instance
   private int recordCount;
+  // Indicates actual number of rows in the RecordBatch container which owns this SV2 instance
   private int batchActualRecordCount = -1;
   private DrillBuf buffer = DeadBuf.DEAD_BUFFER;
 
@@ -159,7 +162,7 @@ public class SelectionVector2 implements AutoCloseable {
     this.recordCount = recordCount;
   }
 
-  public boolean doFullTransfer() {
+  public boolean canDoFullTransfer() {
     return (recordCount == batchActualRecordCount);
   }
 


[drill] 01/02: DRILL-6687: Improve RemovingRecordBatch to do transfer when all records needs to be copied Add optimization in SelectionVector2 to enable RemovingRecordBatch to transfer ValueVectors from incoming to output container when all records needs to be copied. Modified FilterRecordBatch and LimitRecordBatch to play by this optimization

Posted by so...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 9725d4dfd25e9ac6302122463832f3334dd615fa
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Thu Aug 9 19:21:42 2018 -0700

    DRILL-6687: Improve RemovingRecordBatch to do transfer when all records needs to be copied
    Add optimization in SelectionVector2 to enable RemovingRecordBatch to transfer ValueVectors from incoming to output container when all records needs to be copied.
    Modified FilterRecordBatch and LimitRecordBatch to play by this optimization
---
 .../drill/exec/physical/impl/TopN/TopNBatch.java   |  6 +-
 .../exec/physical/impl/filter/FilterTemplate2.java |  7 ++
 .../exec/physical/impl/limit/LimitRecordBatch.java |  1 +
 .../physical/impl/svremover/AbstractCopier.java    | 17 +++--
 .../physical/impl/svremover/AbstractSV2Copier.java | 23 ++++++-
 .../physical/impl/svremover/AbstractSV4Copier.java |  5 +-
 .../drill/exec/physical/impl/svremover/Copier.java |  9 ++-
 .../physical/impl/svremover/GenericCopier.java     | 11 ++-
 .../impl/svremover/GenericCopierFactory.java       | 54 +++++++++++++++
 .../physical/impl/svremover/GenericSV2Copier.java  | 17 ++++-
 .../physical/impl/svremover/GenericSV4Copier.java  | 25 +++----
 .../impl/svremover/RemovingRecordBatch.java        | 80 +---------------------
 .../physical/impl/svremover/StraightCopier.java    | 69 +++++++++++++++++++
 .../exec/record/selection/SelectionVector2.java    | 20 ++++++
 .../impl/svremover/AbstractGenericCopierTest.java  | 35 ++++++----
 .../physical/impl/svremover/GenericCopierTest.java |  8 ++-
 ...ierTest.java => GenericSV2BatchCopierTest.java} | 19 +++--
 .../impl/svremover/GenericSV2CopierTest.java       |  7 +-
 .../impl/svremover/GenericSV4CopierTest.java       |  9 +--
 .../apache/drill/test/rowSet/IndirectRowSet.java   |  1 +
 20 files changed, 262 insertions(+), 161 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 4fc0d15..2763f59 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -47,7 +47,7 @@ import org.apache.drill.exec.physical.config.TopN;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
 import org.apache.drill.exec.physical.impl.svremover.Copier;
-import org.apache.drill.exec.physical.impl.svremover.GenericSV4Copier;
+import org.apache.drill.exec.physical.impl.svremover.GenericCopierFactory;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -360,7 +360,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     SelectionVector4 selectionVector4 = priorityQueue.getSv4();
     SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
     if (copier == null) {
-      copier = GenericSV4Copier.createCopier(batch, newContainer, null);
+      copier = GenericCopierFactory.createAndSetupCopier(batch, newContainer, null);
     } else {
       for (VectorWrapper<?> i : batch) {
 
@@ -468,7 +468,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     @SuppressWarnings("resource")
     final SelectionVector4 selectionVector4 = priorityQueue.getSv4();
     final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
-    copier = GenericSV4Copier.createCopier(batch, newContainer, null);
+    copier = GenericCopierFactory.createAndSetupCopier(batch, newContainer, null);
     @SuppressWarnings("resource")
     SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
     try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index 6d1f034..7b0183b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -67,11 +67,18 @@ public abstract class FilterTemplate2 implements Filterer {
     if (! outgoingSelectionVector.allocateNewSafe(recordCount)) {
       throw new OutOfMemoryException("Unable to allocate filter batch");
     }
+
     switch(svMode){
     case NONE:
+      // Set the actual recordCount in outgoing selection vector to help SVRemover copy the entire
+      // batch if possible at once rather than row-by-row
+      outgoingSelectionVector.setBatchActualRecordCount(recordCount);
       filterBatchNoSV(recordCount);
       break;
     case TWO_BYTE:
+      // Set the actual recordCount in outgoing selection vector to help SVRemover copy the entire
+      // batch if possible at once rather than row-by-row
+      outgoingSelectionVector.setBatchActualRecordCount(incomingSelectionVector.getBatchActualRecordCount());
       filterBatchSV2(recordCount);
       break;
     default:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 06f0fdb..a862714 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -194,6 +194,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
 
     // clear memory for incoming sv (if any)
     if (incomingSv != null) {
+      outgoingSv.setBatchActualRecordCount(incomingSv.getBatchActualRecordCount());
       incomingSv.clear();
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
index ddea468..47ec1cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractCopier.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.svremover;
 
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -31,7 +30,7 @@ public abstract class AbstractCopier implements Copier {
   protected VectorContainer outgoing;
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException {
+  public void setup(RecordBatch incoming, VectorContainer outgoing) {
     this.outgoing = outgoing;
 
     final int count = outgoing.getNumberOfColumns();
@@ -43,7 +42,7 @@ public abstract class AbstractCopier implements Copier {
   }
 
   @Override
-  public int copyRecords(int index, int recordCount) throws SchemaChangeException {
+  public int copyRecords(int index, int recordCount) {
     for(VectorWrapper<?> out : outgoing){
       TypeProtos.MajorType type = out.getField().getType();
       if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
@@ -57,7 +56,7 @@ public abstract class AbstractCopier implements Copier {
   }
 
   @Override
-  public int appendRecord(int index) throws SchemaChangeException {
+  public int appendRecord(int index) {
     int outgoingPosition = outgoing.getRecordCount();
     copyEntryIndirect(index, outgoingPosition);
     outgoingPosition++;
@@ -66,11 +65,11 @@ public abstract class AbstractCopier implements Copier {
   }
 
   @Override
-  public int appendRecords(int index, int recordCount) throws SchemaChangeException {
+  public int appendRecords(int index, int recordCount) {
     return insertRecords(outgoing.getRecordCount(), index, recordCount);
   }
 
-  private int insertRecords(int outgoingPosition, int index, int recordCount) throws SchemaChangeException {
+  private int insertRecords(int outgoingPosition, int index, int recordCount) {
     final int endIndex = index + recordCount;
 
     for(int svIndex = index; svIndex < endIndex; svIndex++, outgoingPosition++){
@@ -81,7 +80,7 @@ public abstract class AbstractCopier implements Copier {
     return outgoingPosition;
   }
 
-  private void updateCounts(int numRecords) {
+  protected void updateCounts(int numRecords) {
     outgoing.setRecordCount(numRecords);
 
     for (int vectorIndex = 0; vectorIndex < vvOut.length; vectorIndex++) {
@@ -89,7 +88,7 @@ public abstract class AbstractCopier implements Copier {
     }
   }
 
-  public abstract void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException;
+  public abstract void copyEntryIndirect(int inIndex, int outIndex);
 
-  public abstract void copyEntry(int inIndex, int outIndex) throws SchemaChangeException;
+  public abstract void copyEntry(int inIndex, int outIndex);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
index 321d9a8..ec712e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV2Copier.java
@@ -17,19 +17,23 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.vector.ValueVector;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public abstract class AbstractSV2Copier extends AbstractCopier {
   protected ValueVector[] vvIn;
   private SelectionVector2 sv2;
+  protected List<TransferPair> transferPairs = new ArrayList<>();
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException {
+  public void setup(RecordBatch incoming, VectorContainer outgoing) {
     super.setup(incoming, outgoing);
     this.sv2 = incoming.getSelectionVector2();
 
@@ -46,7 +50,20 @@ public abstract class AbstractSV2Copier extends AbstractCopier {
     }
   }
 
-  public void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException {
+  public void copyEntryIndirect(int inIndex, int outIndex) {
     copyEntry(sv2.getIndex(inIndex), outIndex);
   }
+
+  @Override
+  public int copyRecords(int index, int recordCount) {
+    if (sv2.doFullTransfer()) {
+      for (TransferPair pair : transferPairs) {
+        pair.transfer();
+      }
+      updateCounts(recordCount);
+      return recordCount;
+    }
+
+    return super.copyRecords(index, recordCount);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
index cd6af07..56e2586 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/AbstractSV4Copier.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -31,7 +30,7 @@ public abstract class AbstractSV4Copier extends AbstractCopier {
   private SelectionVector4 sv4;
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException{
+  public void setup(RecordBatch incoming, VectorContainer outgoing) {
     super.setup(incoming, outgoing);
     this.sv4 = incoming.getSelectionVector4();
 
@@ -48,7 +47,7 @@ public abstract class AbstractSV4Copier extends AbstractCopier {
     }
   }
 
-  public void copyEntryIndirect(int inIndex, int outIndex) throws SchemaChangeException {
+  public void copyEntryIndirect(int inIndex, int outIndex) {
     copyEntry(sv4.get(inIndex), outIndex);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
index bc31252..92dea70 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
@@ -17,13 +17,12 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 
 public interface Copier {
-  void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException;
-  int copyRecords(int index, int recordCount) throws SchemaChangeException;
-  int appendRecord(int index) throws SchemaChangeException;
-  int appendRecords(int index, int recordCount) throws SchemaChangeException;
+  void setup(RecordBatch incoming, VectorContainer outgoing);
+  int copyRecords(int index, int recordCount);
+  int appendRecord(int index);
+  int appendRecords(int index, int recordCount);
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
index de048dc..72516e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopier.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -30,7 +29,7 @@ public class GenericCopier implements Copier {
   private VectorContainer outgoing;
 
   @Override
-  public void setup(RecordBatch incoming, VectorContainer outgoing) throws SchemaChangeException {
+  public void setup(RecordBatch incoming, VectorContainer outgoing) {
     this.outgoing = outgoing;
 
     final int count = outgoing.getNumberOfColumns();
@@ -53,12 +52,12 @@ public class GenericCopier implements Copier {
   }
 
   @Override
-  public int copyRecords(int index, int recordCount) throws SchemaChangeException {
+  public int copyRecords(int index, int recordCount) {
     return insertRecords(0, index, recordCount);
   }
 
   @Override
-  public int appendRecord(int index) throws SchemaChangeException {
+  public int appendRecord(int index) {
     int outgoingPosition = outgoing.getRecordCount();
     for (int vectorIndex = 0; vectorIndex < vvIn.length; vectorIndex++) {
       vvOut[vectorIndex].copyEntry(outgoingPosition, vvIn[vectorIndex], index);
@@ -69,11 +68,11 @@ public class GenericCopier implements Copier {
   }
 
   @Override
-  public int appendRecords(int index, int recordCount) throws SchemaChangeException {
+  public int appendRecords(int index, int recordCount) {
     return insertRecords(outgoing.getRecordCount(), index, recordCount);
   }
 
-  private int insertRecords(int outgoingPosition, int startIndex, int recordCount) throws SchemaChangeException {
+  private int insertRecords(int outgoingPosition, int startIndex, int recordCount) {
     final int endIndex = startIndex + recordCount;
 
     for (int index = startIndex; index < endIndex; index++, outgoingPosition++) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierFactory.java
new file mode 100644
index 0000000..cd6dd02
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
+
+public class GenericCopierFactory {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GenericCopierFactory.class);
+
+  public static Copier createAndSetupCopier(RecordBatch incoming, VectorContainer outputContainer,
+                                            SchemaChangeCallBack callBack) {
+    Copier copier;
+
+    switch(incoming.getSchema().getSelectionVectorMode()){
+      case NONE:
+        copier = new StraightCopier(incoming, outputContainer, callBack);
+        break;
+      case TWO_BYTE:
+        copier = new GenericSV2Copier(incoming, outputContainer, callBack);
+        break;
+      case FOUR_BYTE:
+        copier = new GenericSV4Copier(incoming, outputContainer, callBack);
+        break;
+      default:
+        throw new UnsupportedOperationException();
+    }
+
+    copier.setup(incoming, outputContainer);
+    return copier;
+  }
+
+  public static Copier createAndSetupNonSVGenericCopier(RecordBatch incoming, VectorContainer outputContainer) {
+    Copier copier = new GenericCopier();
+    copier.setup(incoming, outputContainer);
+    return copier;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java
index a375f45..f607e8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2Copier.java
@@ -17,11 +17,24 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
 
 public class GenericSV2Copier extends AbstractSV2Copier {
+
+  public GenericSV2Copier(RecordBatch incomingBatch, VectorContainer outputContainer,
+                          SchemaChangeCallBack callBack) {
+    for(VectorWrapper<?> vv : incomingBatch){
+      TransferPair pair = vv.getValueVector().makeTransferPair(outputContainer.addOrGet(vv.getField(), callBack));
+      transferPairs.add(pair);
+    }
+  }
+
   @Override
-  public void copyEntry(int inIndex, int outIndex) throws SchemaChangeException {
+  public void copyEntry(int inIndex, int outIndex) {
     for ( int i = 0;  i < vvIn.length;  i++ ) {
       vvOut[i].copyEntry(outIndex, vvIn[i], inIndex);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
index 1f3d28b..c676841 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4Copier.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -25,8 +24,18 @@ import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 
 public class GenericSV4Copier extends AbstractSV4Copier {
+
+  public GenericSV4Copier(RecordBatch incomingBatch, VectorContainer outputContainer,
+                          SchemaChangeCallBack callBack) {
+    for(VectorWrapper<?> vv : incomingBatch){
+      @SuppressWarnings("resource")
+      ValueVector v = vv.getValueVectors()[0];
+      v.makeTransferPair(outputContainer.addOrGet(v.getField(), callBack));
+    }
+  }
+
   @Override
-  public void copyEntry(int inIndex, int outIndex) throws SchemaChangeException {
+  public void copyEntry(int inIndex, int outIndex) {
     int inOffset = inIndex & 0xFFFF;
     int inVector = inIndex >>> 16;
     for ( int i = 0;  i < vvIn.length;  i++ ) {
@@ -34,16 +43,4 @@ public class GenericSV4Copier extends AbstractSV4Copier {
       vvOut[i].copyEntry(outIndex, vectorsFromIncoming[inVector], inOffset);
     }
   }
-
-  public static Copier createCopier(RecordBatch batch, VectorContainer container, SchemaChangeCallBack callBack) throws SchemaChangeException {
-    for(VectorWrapper<?> vv : batch){
-      @SuppressWarnings("resource")
-      ValueVector v = vv.getValueVectors()[0];
-      v.makeTransferPair(container.addOrGet(v.getField(), callBack));
-    }
-
-    Copier copier = new GenericSV4Copier();
-    copier.setup(batch, container);
-    return copier;
-  }
 }
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index acfdc87..1471d5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import java.util.List;
-
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -26,14 +24,9 @@ import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
 public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVectorRemover>{
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class);
 
@@ -56,19 +49,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
     // send OK to downstream instead. Since the output of RemovingRecordBatch is always going to be a regular container
     // change in incoming container type is not actual schema change.
     container.zeroVectors();
-    switch(incoming.getSchema().getSelectionVectorMode()){
-    case NONE:
-      this.copier = getStraightCopier();
-      break;
-    case TWO_BYTE:
-      this.copier = create2Copier();
-      break;
-    case FOUR_BYTE:
-      this.copier = create4Copier();
-      break;
-    default:
-      throw new UnsupportedOperationException();
-    }
+    copier = GenericCopierFactory.createAndSetupCopier(incoming, container, callBack);
 
     // If there is an actual schema change then below condition will be true and it will send OK_NEW_SCHEMA
     // downstream too
@@ -84,7 +65,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
   protected IterOutcome doWork() {
     try {
       copier.copyRecords(0, incoming.getRecordCount());
-    } catch (SchemaChangeException e) {
+    } catch (Exception e) {
       throw new IllegalStateException(e);
     } finally {
       if (incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.FOUR_BYTE) {
@@ -107,63 +88,6 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
     super.close();
   }
 
-  private class StraightCopier implements Copier{
-
-    private List<TransferPair> pairs = Lists.newArrayList();
-
-    @Override
-    public void setup(RecordBatch incoming, VectorContainer outgoing){
-      for(VectorWrapper<?> vv : incoming){
-        TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack));
-        pairs.add(tp);
-      }
-    }
-
-    @Override
-    public int copyRecords(int index, int recordCount) {
-      assert index == 0 && recordCount == incoming.getRecordCount() : "Straight copier cannot split batch";
-      for(TransferPair tp : pairs){
-        tp.transfer();
-      }
-
-      container.setRecordCount(incoming.getRecordCount());
-      return recordCount;
-    }
-
-    @Override
-    public int appendRecord(int index) throws SchemaChangeException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int appendRecords(int index, int recordCount) throws SchemaChangeException {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  private Copier getStraightCopier(){
-    StraightCopier copier = new StraightCopier();
-    copier.setup(incoming, container);
-    return copier;
-  }
-
-  private Copier create2Copier() throws SchemaChangeException {
-    Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
-
-    for(VectorWrapper<?> vv : incoming){
-      vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack));
-    }
-
-    Copier copier = new GenericSV2Copier();
-    copier.setup(incoming, container);
-    return copier;
-  }
-
-  private Copier create4Copier() throws SchemaChangeException {
-    Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
-    return GenericSV4Copier.createCopier(incoming, container, callBack);
-  }
-
   @Override
   public WritableBatch getWritableBatch() {
     return WritableBatch.get(this);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java
new file mode 100644
index 0000000..33f2a96
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/StraightCopier.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.svremover;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
+
+import java.util.List;
+
+public class StraightCopier implements Copier {
+    private List<TransferPair> pairs = Lists.newArrayList();
+    private RecordBatch incoming;
+    private VectorContainer outputContainer;
+    private SchemaChangeCallBack callBack;
+
+    public StraightCopier(RecordBatch incomingBatch, VectorContainer outputContainer, SchemaChangeCallBack callBack) {
+      this.incoming = incomingBatch;
+      this.outputContainer = outputContainer;
+      this.callBack = callBack;
+    }
+
+    @Override
+    public void setup(RecordBatch incoming, VectorContainer outgoing) {
+      for(VectorWrapper<?> vv : incoming){
+        TransferPair tp = vv.getValueVector().makeTransferPair(outputContainer.addOrGet(vv.getField(), callBack));
+        pairs.add(tp);
+      }
+    }
+
+    @Override
+    public int copyRecords(int index, int recordCount) {
+      assert index == 0 && recordCount == incoming.getRecordCount() : "Straight copier cannot split batch";
+      for(TransferPair tp : pairs){
+        tp.transfer();
+      }
+
+      outputContainer.setRecordCount(incoming.getRecordCount());
+      return recordCount;
+    }
+
+    @Override
+    public int appendRecord(int index) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int appendRecords(int index, int recordCount) {
+      throw new UnsupportedOperationException();
+    }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index 7244148..1d9c5da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -37,6 +37,7 @@ public class SelectionVector2 implements AutoCloseable {
 
   private final BufferAllocator allocator;
   private int recordCount;
+  private int batchActualRecordCount = -1;
   private DrillBuf buffer = DeadBuf.DEAD_BUFFER;
 
   public static final int RECORD_SIZE = 2;
@@ -61,6 +62,11 @@ public class SelectionVector2 implements AutoCloseable {
     recordCount = count;
   }
 
+  public SelectionVector2(BufferAllocator allocator, DrillBuf buf, int count, int actualRecordCount) {
+    this(allocator, buf, count);
+    this.batchActualRecordCount = actualRecordCount;
+  }
+
   public int getCount() {
     return recordCount;
   }
@@ -127,6 +133,7 @@ public class SelectionVector2 implements AutoCloseable {
   public SelectionVector2 clone() {
     SelectionVector2 newSV = new SelectionVector2(allocator);
     newSV.recordCount = recordCount;
+    newSV.batchActualRecordCount = batchActualRecordCount;
     newSV.buffer = buffer;
 
     /* Since buffer and newSV.buffer essentially point to the
@@ -143,6 +150,7 @@ public class SelectionVector2 implements AutoCloseable {
       buffer.release();
       buffer = DeadBuf.DEAD_BUFFER;
       recordCount = 0;
+      batchActualRecordCount = -1;
     }
   }
 
@@ -151,6 +159,18 @@ public class SelectionVector2 implements AutoCloseable {
     this.recordCount = recordCount;
   }
 
+  public boolean doFullTransfer() {
+    return (recordCount == batchActualRecordCount);
+  }
+
+  public void setBatchActualRecordCount(int actualRecordCount) {
+    this.batchActualRecordCount = actualRecordCount;
+  }
+
+  public int getBatchActualRecordCount() {
+    return batchActualRecordCount;
+  }
+
   @Override
   public void close() {
     clear();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
index 01263b1..7d444b4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/AbstractGenericCopierTest.java
@@ -17,34 +17,34 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import com.google.common.collect.Lists;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.RootAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBatch;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
 import org.junit.Test;
 
-import java.util.List;
-
 public abstract class AbstractGenericCopierTest {
   @Test
   public void testCopyRecords() throws SchemaChangeException {
     try (RootAllocator allocator = new RootAllocator(10_000_000)) {
-      final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+      final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
       final RowSet srcRowSet = createSrcRowSet(allocator);
       final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build();
       final VectorContainer destContainer = destRowSet.container();
-      final Copier copier = createCopier();
+      final Copier copier = createCopier(new RowSetBatch(srcRowSet), destContainer, null);
       final RowSet expectedRowSet = createExpectedRowset(allocator);
 
-      copier.setup(new RowSetBatch(srcRowSet), destContainer);
       copier.copyRecords(0, 3);
 
       try {
@@ -65,14 +65,13 @@ public abstract class AbstractGenericCopierTest {
   @Test
   public void testAppendRecords() throws SchemaChangeException {
     try (RootAllocator allocator = new RootAllocator(10_000_000)) {
-      final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+      final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
       final RowSet srcRowSet = createSrcRowSet(allocator);
       final RowSet destRowSet = new RowSetBuilder(allocator, batchSchema).build();
       final VectorContainer destContainer = destRowSet.container();
-      final Copier copier = createCopier();
+      final Copier copier = createCopier(new RowSetBatch(srcRowSet), destContainer, null);
       final RowSet expectedRowSet = createExpectedRowset(allocator);
 
-      copier.setup(new RowSetBatch(srcRowSet), destContainer);
       copier.appendRecord(0);
       copier.appendRecords(1, 2);
 
@@ -93,7 +92,10 @@ public abstract class AbstractGenericCopierTest {
 
   public abstract RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException;
 
-  public abstract Copier createCopier();
+  public Copier createCopier(RecordBatch incoming, VectorContainer outputContainer,
+                                      SchemaChangeCallBack callback) {
+    return GenericCopierFactory.createAndSetupCopier(incoming, outputContainer, callback);
+  }
 
   public static Object[] row1() {
     return new Object[]{110, "green", new float[]{5.5f, 2.3f}, new String[]{"1a", "1b"}};
@@ -115,7 +117,7 @@ public abstract class AbstractGenericCopierTest {
     return new Object[]{106, "black", new float[]{.75f}, new String[]{"4a"}};
   }
 
-  public static RowSet createExpectedRowset(RootAllocator allocator) {
+  private RowSet createExpectedRowset(RootAllocator allocator) {
     return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.NONE))
       .addRow(row1())
       .addRow(row2())
@@ -123,14 +125,17 @@ public abstract class AbstractGenericCopierTest {
       .build();
   }
 
-  public static BatchSchema createTestSchema(BatchSchema.SelectionVectorMode mode) {
+  protected TupleMetadata createTestSchema(BatchSchema.SelectionVectorMode mode) {
     MaterializedField colA = MaterializedField.create("colA", Types.required(TypeProtos.MinorType.INT));
     MaterializedField colB = MaterializedField.create("colB", Types.required(TypeProtos.MinorType.VARCHAR));
     MaterializedField colC = MaterializedField.create("colC", Types.repeated(TypeProtos.MinorType.FLOAT4));
     MaterializedField colD = MaterializedField.create("colD", Types.repeated(TypeProtos.MinorType.VARCHAR));
 
-    List<MaterializedField> cols = Lists.newArrayList(colA, colB, colC, colD);
-    BatchSchema batchSchema = new BatchSchema(mode, cols);
-    return batchSchema;
+    return new SchemaBuilder().add(colA)
+      .add(colB)
+      .add(colC)
+      .add(colD)
+      .withSVMode(mode)
+      .buildSchema();
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
index f946f81..d6c38e7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericCopierTest.java
@@ -19,6 +19,9 @@ package org.apache.drill.exec.physical.impl.svremover;
 
 import org.apache.drill.exec.memory.RootAllocator;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 
@@ -35,7 +38,8 @@ public class GenericCopierTest extends AbstractGenericCopierTest {
   }
 
   @Override
-  public Copier createCopier() {
-    return new GenericCopier();
+  public Copier createCopier(RecordBatch incoming, VectorContainer outputContainer,
+                             SchemaChangeCallBack callback) {
+    return GenericCopierFactory.createAndSetupNonSVGenericCopier(incoming, outputContainer);
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java
similarity index 79%
copy from exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
copy to exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java
index 428124d..748e0d0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2BatchCopierTest.java
@@ -21,21 +21,20 @@ import org.apache.drill.exec.memory.RootAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
-public class GenericSV2CopierTest extends AbstractGenericCopierTest {
+
+/**
+ * Verifies optimization in SV2 such that when total record to copy is same as number of records in the
+ * underlying batch for SV2 then SV2 will do transfer rather than row by row copy
+ */
+public class GenericSV2BatchCopierTest extends AbstractGenericCopierTest {
+
   @Override
   public RowSet createSrcRowSet(RootAllocator allocator) {
     return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE))
-      .addRow(row1())
-      .addSelection(false, row4())
+      .addSelection(true, row1())
       .addRow(row2())
-      .addSelection(false, row5())
-      .addRow(row3())
+      .addSelection(true, row3())
       .withSv2()
       .build();
   }
-
-  @Override
-  public Copier createCopier() {
-    return new GenericSV2Copier();
-  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
index 428124d..b2f0e51 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV2CopierTest.java
@@ -21,7 +21,9 @@ import org.apache.drill.exec.memory.RootAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
+
 public class GenericSV2CopierTest extends AbstractGenericCopierTest {
+
   @Override
   public RowSet createSrcRowSet(RootAllocator allocator) {
     return new RowSetBuilder(allocator, createTestSchema(BatchSchema.SelectionVectorMode.TWO_BYTE))
@@ -33,9 +35,4 @@ public class GenericSV2CopierTest extends AbstractGenericCopierTest {
       .withSv2()
       .build();
   }
-
-  @Override
-  public Copier createCopier() {
-    return new GenericSV2Copier();
-  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
index 447ad3a..a5f5bb7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/GenericSV4CopierTest.java
@@ -23,15 +23,17 @@ import org.apache.drill.exec.memory.RootAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.test.rowSet.HyperRowSetImpl;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 
 public class GenericSV4CopierTest extends AbstractGenericCopierTest {
+
   @Override
   public RowSet createSrcRowSet(RootAllocator allocator) throws SchemaChangeException {
-    final BatchSchema batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
+    final TupleMetadata batchSchema = createTestSchema(BatchSchema.SelectionVectorMode.NONE);
     final DrillBuf drillBuf = allocator.buffer(4 * 3);
     final SelectionVector4 sv4 = new SelectionVector4(drillBuf, 3, Character.MAX_VALUE);
 
@@ -57,9 +59,4 @@ public class GenericSV4CopierTest extends AbstractGenericCopierTest {
 
     return new HyperRowSetImpl(hyperContainer, sv4);
   }
-
-  @Override
-  public Copier createCopier() {
-    return new GenericSV4Copier();
-  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
index f0ebdc0..878aa25 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/IndirectRowSet.java
@@ -96,6 +96,7 @@ public class IndirectRowSet extends AbstractSingleRowSet {
       destIndex++;
     }
     sv2.setRecordCount(rowCount);
+    sv2.setBatchActualRecordCount(container.getRecordCount());
     container.buildSchema(SelectionVectorMode.TWO_BYTE);
     return sv2;
   }