You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ab...@apache.org on 2021/08/05 07:56:25 UTC

[hive] branch master updated: HIVE-25061: PTF: Improve ValueBoundaryScanner (#2225) (Laszlo Bodor reviewed by Panagiotis Garefalakis)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 2db4fd5  HIVE-25061: PTF: Improve ValueBoundaryScanner (#2225) (Laszlo Bodor reviewed by Panagiotis Garefalakis)
2db4fd5 is described below

commit 2db4fd5cc1a7a03654d6713d605bc6c7bd4eb6f0
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Thu Aug 5 09:56:17 2021 +0200

    HIVE-25061: PTF: Improve ValueBoundaryScanner (#2225) (Laszlo Bodor reviewed by Panagiotis Garefalakis)
---
 .../hive/ql/udf/ptf/ValueBoundaryScanner.java      | 184 ++++++++++++--------
 .../hadoop/hive/ql/udf/ptf/TestBoundaryCache.java  | 192 ++++++++++++---------
 2 files changed, 220 insertions(+), 156 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/ValueBoundaryScanner.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/ValueBoundaryScanner.java
index b76af63..c6bb1c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/ValueBoundaryScanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/ValueBoundaryScanner.java
@@ -42,10 +42,24 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
+
 public abstract class ValueBoundaryScanner {
   BoundaryDef start, end;
   protected final boolean nullsLast;
 
+  /*
+   * The parameter enableBinarySearch can be configurable sometime later, because in edge cases
+   * (very small windows compared to ptf partition size) a linear search could lead to slightly
+   * better performance, but not significantly. Currently, propagating the option from PTF codepath
+   * would involve lots of function signature changes, so it isn't worth. This boolean still can be
+   * used for testing purposes (e.g. comparing the linear and binary search results to make sure
+   * about correctness).
+   */
+  @VisibleForTesting
+  boolean enableBinarySearch = true;
+
   public ValueBoundaryScanner(BoundaryDef start, BoundaryDef end, boolean nullsLast) {
     this.start = start;
     this.end = end;
@@ -238,6 +252,7 @@ public abstract class ValueBoundaryScanner {
         rowVal = computeValue(p.getAt(r));
       }
     }
+
     return new ImmutablePair<>(r, rowVal);
   }
 
@@ -403,26 +418,11 @@ abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner {
       }
     }
 
-    Object rowVal = sortKey;
-    int r = rowIdx;
-
-    // Use Case 4.
-    if ( expressionDef.getOrder() == Order.DESC ) {
-      while (r >= 0 && !isDistanceGreater(rowVal, sortKey, amt) ) {
-        Pair<Integer, Object> stepResult = skipOrStepBack(r, p);
-        r = stepResult.getLeft();
-        rowVal = stepResult.getRight();
-      }
-      return r + 1;
-    }
-    else { // Use Case 5.
-      while (r >= 0 && !isDistanceGreater(sortKey, rowVal, amt) ) {
-        Pair<Integer, Object> stepResult = skipOrStepBack(r, p);
-        r = stepResult.getLeft();
-        rowVal = stepResult.getRight();
-      }
-
-      return r + 1;
+    // Use Case 4,5
+    if (enableBinarySearch) {
+      return binarySearchBack(rowIdx, p, sortKey, amt, expressionDef.getOrder());
+    } else {
+      return linearSearchBack(rowIdx, p, sortKey, amt, expressionDef.getOrder());
     }
   }
 
@@ -456,7 +456,6 @@ abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner {
     Object sortKey = computeValueUseCache(rowIdx, p);
 
     Object rowVal = sortKey;
-    int r = rowIdx;
 
     if ( sortKey == null ) {
       // Use Case 9.
@@ -464,31 +463,20 @@ abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner {
         return p.size();
       }
       else { // Use Case 10.
-        while (r < p.size() && rowVal == null ) {
-          Pair<Integer, Object> stepResult = skipOrStepForward(r, p);
-          r = stepResult.getLeft();
+        while (rowIdx < p.size() && rowVal == null ) {
+          Pair<Integer, Object> stepResult = skipOrStepForward(rowIdx, p);
+          rowIdx = stepResult.getLeft();
           rowVal = stepResult.getRight();
         }
-        return r;
+        return rowIdx;
       }
     }
 
-    // Use Case 11.
-    if ( expressionDef.getOrder() == Order.DESC) {
-      while (r < p.size() && !isDistanceGreater(sortKey, rowVal, amt) ) {
-        Pair<Integer, Object> stepResult = skipOrStepForward(r, p);
-        r = stepResult.getLeft();
-        rowVal = stepResult.getRight();
-      }
-      return r;
-    }
-    else { // Use Case 12.
-      while (r < p.size() && !isDistanceGreater(rowVal, sortKey, amt) ) {
-        Pair<Integer, Object> stepResult = skipOrStepForward(r, p);
-        r = stepResult.getLeft();
-        rowVal = stepResult.getRight();
-      }
-      return r;
+    // Use Case 11,12
+    if (enableBinarySearch) {
+      return binarySearchForward(rowIdx, p, sortKey, amt, expressionDef.getOrder());
+    } else {
+      return linearSearchForward(rowIdx, p, sortKey, amt, expressionDef.getOrder());
     }
   }
 
@@ -554,25 +542,13 @@ abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner {
       }
     }
 
-    Object rowVal = sortKey;
     int r = rowIdx;
 
-    // Use Case 4.
-    if ( expressionDef.getOrder() == Order.DESC ) {
-      while (r >= 0 && !isDistanceGreater(rowVal, sortKey, amt) ) {
-        Pair<Integer, Object> stepResult = skipOrStepBack(r, p);
-        r = stepResult.getLeft();
-        rowVal = stepResult.getRight();
-      }
-      return r + 1;
-    }
-    else { // Use Case 5.
-      while (r >= 0 && !isDistanceGreater(sortKey, rowVal, amt) ) {
-        Pair<Integer, Object> stepResult = skipOrStepBack(r, p);
-        r = stepResult.getLeft();
-        rowVal = stepResult.getRight();
-      }
-      return r + 1;
+    // Use Case 4,5
+    if (enableBinarySearch) {
+      return binarySearchBack(r, p, sortKey, amt, expressionDef.getOrder());
+    } else {
+      return linearSearchBack(r, p, sortKey, amt, expressionDef.getOrder());
     }
   }
 
@@ -628,22 +604,11 @@ abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner {
       }
     }
 
-    // Use Case 11.
-    if ( expressionDef.getOrder() == Order.DESC) {
-      while (r < p.size() && !isDistanceGreater(sortKey, rowVal, amt) ) {
-        Pair<Integer, Object> stepResult = skipOrStepForward(r, p);
-        r = stepResult.getLeft();
-        rowVal = stepResult.getRight();
-      }
-      return r;
-    }
-    else { // Use Case 12.
-      while (r < p.size() && !isDistanceGreater(rowVal, sortKey, amt) ) {
-        Pair<Integer, Object> stepResult = skipOrStepForward(r, p);
-        r = stepResult.getLeft();
-        rowVal = stepResult.getRight();
-      }
-      return r;
+    // Use Case 11,12
+    if (enableBinarySearch) {
+      return binarySearchForward(r, p, sortKey, amt, expressionDef.getOrder());
+    } else {
+      return linearSearchForward(r, p, sortKey, amt, expressionDef.getOrder());
     }
   }
 
@@ -664,6 +629,77 @@ abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner {
    */
   public abstract boolean isEqual(Object v1, Object v2);
 
+  protected int binarySearchBack(int rowId, PTFPartition p, Object sortKey, int amt,
+      Order order) throws HiveException {
+    boolean isOrderDesc = order.equals(Order.DESC);
+    Object rowVal = null;
+
+    int rMin = 0;  // tracks lowest possible number fulfilling the range requirement
+    int rMax = rowId; // tracks highest possible number fulfilling the range requirement
+
+    boolean isDistanceGreater = true;
+    while (rMin < rMax) {
+      rowVal = computeValueUseCache(rowId, p);
+      isDistanceGreater = isDistanceGreater(isOrderDesc ? rowVal : sortKey, isOrderDesc ? sortKey : rowVal, amt);
+      if (isDistanceGreater) {
+        rMin = rowId + 1;
+      } else {
+        rMax = rowId;
+      }
+      rowId = rMin + (rMax - rMin) / 2;
+    }
+    return rMin;
+  }
+
+  private int linearSearchBack(int r, PTFPartition p, Object sortKey, int amt,
+      Order order) throws HiveException {
+    boolean isOrderDesc = order.equals(Order.DESC);
+    Object rowVal = sortKey;
+    while (r >= 0 && !isDistanceGreater(isOrderDesc ? rowVal : sortKey,
+        isOrderDesc ? sortKey : rowVal, amt)) {
+      Pair<Integer, Object> stepResult = skipOrStepBack(r, p);
+      r = stepResult.getLeft();
+      rowVal = stepResult.getRight();
+    }
+    return r + 1;
+  }
+
+  protected int binarySearchForward(int rowId, PTFPartition p, Object sortKey, int amt,
+      Order order) throws HiveException {
+    boolean isOrderDesc = order.equals(Order.DESC);
+    Object rowVal = null;
+
+    int rMin = rowId;  // tracks lowest possible number fulfilling the range requirement
+    int rMax = p.size(); // tracks highest possible number fulfilling the range requirement
+
+    boolean isDistanceGreater = true;
+    while (rMin < rMax) {
+      rowVal = computeValueUseCache(rowId, p);
+      isDistanceGreater =
+          isDistanceGreater(isOrderDesc ? sortKey : rowVal, isOrderDesc ? rowVal : sortKey, amt);
+      if (isDistanceGreater) {
+        rMax = rowId;
+      } else {
+        rMin = rowId + 1;
+      }
+      rowId = rMin + (rMax - rMin) / 2;
+    }
+    return rMin;
+  }
+
+  private int linearSearchForward(int r, PTFPartition p, Object sortKey, int amt,
+      Order order) throws HiveException {
+    boolean isOrderDesc = order.equals(Order.DESC);
+    Object rowVal = sortKey;
+    while (r < p.size() && !isDistanceGreater(isOrderDesc ? sortKey : rowVal,
+        isOrderDesc ? rowVal : sortKey, amt)) {
+      Pair<Integer, Object> stepResult = skipOrStepForward(r, p);
+      r = stepResult.getLeft();
+      rowVal = stepResult.getRight();
+    }
+    return r;
+  }
+
   public static SingleValueBoundaryScanner getScanner(BoundaryDef start, BoundaryDef end,
       OrderDef orderDef, boolean nullsLast) throws HiveException {
     if (orderDef.getExpressions().size() != 1) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/udf/ptf/TestBoundaryCache.java b/ql/src/test/org/apache/hadoop/hive/ql/udf/ptf/TestBoundaryCache.java
index 5bd1ca4..bbf60d3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/udf/ptf/TestBoundaryCache.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/udf/ptf/TestBoundaryCache.java
@@ -64,37 +64,41 @@ public class TestBoundaryCache {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestBoundaryCache.class);
   private static final LinkedList<List<IntWritable>> TEST_PARTITION = new LinkedList<>();
+  private static final LinkedList<List<IntWritable>> TEST_PARTITION_NO_NULLS = new LinkedList<>();
+  @SuppressWarnings("unchecked")
+  private static final List<LinkedList<List<IntWritable>>> TEST_PARTITIONS =
+      Lists.newArrayList(TEST_PARTITION, TEST_PARTITION_NO_NULLS);
   //Null for using no cache at all, 2 is minimum cache length, 5-9-15 for checking with smaller,
   // exactly equal and larger cache than needed.
   private static final List<Integer> CACHE_SIZES = Lists.newArrayList(null, 2, 5, 9, 15);
   private static final List<PTFInvocationSpec.Order> ORDERS = Lists.newArrayList(ASC, DESC);
   private static final int ORDER_BY_COL = 2;
 
+  @SuppressWarnings("unchecked")
   @BeforeClass
   public static void setupTests() throws Exception {
     //8 ranges, max cache content is 8+1=9 entries
-    addRow(TEST_PARTITION, 1, 1, -7);
-    addRow(TEST_PARTITION, 2, 1, -1);
-    addRow(TEST_PARTITION, 3, 1, -1);
-    addRow(TEST_PARTITION, 4, 1, 1);
-    addRow(TEST_PARTITION, 5, 1, 1);
-    addRow(TEST_PARTITION, 6, 1, 1);
-    addRow(TEST_PARTITION, 7, 1, 1);
-    addRow(TEST_PARTITION, 8, 1, 2);
-    addRow(TEST_PARTITION, 9, 1, 2);
-    addRow(TEST_PARTITION, 10, 1, 2);
-    addRow(TEST_PARTITION, 11, 1, 2);
-    addRow(TEST_PARTITION, 12, 1, 3);
-    addRow(TEST_PARTITION, 13, 1, 5);
-    addRow(TEST_PARTITION, 14, 1, 5);
-    addRow(TEST_PARTITION, 15, 1, 5);
-    addRow(TEST_PARTITION, 16, 1, 5);
-    addRow(TEST_PARTITION, 17, 1, 6);
-    addRow(TEST_PARTITION, 18, 1, 6);
-    addRow(TEST_PARTITION, 19, 1, 9);
-    addRow(TEST_PARTITION, 20, 1, null);
-    addRow(TEST_PARTITION, 21, 1, null);
-
+    addRow(1, 1, -7, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(2, 1, -1, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(3, 1, -1, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(4, 1, 1, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(5, 1, 1, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(6, 1, 1, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(7, 1, 1, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(8, 1, 2, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(9, 1, 2, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(10, 1, 2, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(11, 1, 2, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(12, 1, 3, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(13, 1, 5, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(14, 1, 5, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(15, 1, 5, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(16, 1, 5, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(17, 1, 6, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(18, 1, 6, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(19, 1, 9, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+    addRow(20, 1, null, TEST_PARTITION);
+    addRow(21, 1, null, TEST_PARTITION);
   }
 
   @Test
@@ -160,51 +164,72 @@ public class TestBoundaryCache {
   private void runTest(WindowingSpec.Direction startDirection, int startAmount,
                        WindowingSpec.Direction endDirection, int endAmount) throws Exception {
 
-    BoundaryDef startBoundary = new BoundaryDef(startDirection, startAmount);
-    BoundaryDef endBoundary = new BoundaryDef(endDirection, endAmount);
-    AtomicInteger readCounter = new AtomicInteger(0);
-
-    int[] expectedBoundaryStarts = new int[TEST_PARTITION.size()];
-    int[] expectedBoundaryEnds = new int[TEST_PARTITION.size()];
-    int expectedReadCountWithoutCache = -1;
-
-    for (PTFInvocationSpec.Order order : ORDERS) {
-      for (Integer cacheSize : CACHE_SIZES) {
-        LOG.info(Thread.currentThread().getStackTrace()[2].getMethodName());
-        LOG.info("Cache: " + cacheSize + " order: " + order);
-        BoundaryCache cache = cacheSize == null ? null : new BoundaryCache(cacheSize);
-        Pair<PTFPartition, ValueBoundaryScanner> mocks = setupMocks(TEST_PARTITION,
-                ORDER_BY_COL, startBoundary, endBoundary, order, cache, readCounter);
-        PTFPartition ptfPartition = mocks.getLeft();
-        ValueBoundaryScanner scanner = mocks.getRight();
-        for (int i = 0; i < TEST_PARTITION.size(); ++i) {
-          scanner.handleCache(i, ptfPartition);
-          int start = scanner.computeStart(i, ptfPartition);
-          int end = scanner.computeEnd(i, ptfPartition) - 1;
+    for (LinkedList<List<IntWritable>> partition : TEST_PARTITIONS){
+      BoundaryDef startBoundary = new BoundaryDef(startDirection, startAmount);
+      BoundaryDef endBoundary = new BoundaryDef(endDirection, endAmount);
+      AtomicInteger readCounter = new AtomicInteger(0);
+
+      int[] expectedBoundaryStarts = new int[partition.size()];
+      int[] expectedBoundaryEnds = new int[partition.size()];
+      int expectedReadCountWithoutCache = -1;
+
+      for (PTFInvocationSpec.Order order : ORDERS) {
+        for (Integer cacheSize : CACHE_SIZES) {
+          LOG.info(Thread.currentThread().getStackTrace()[2].getMethodName());
+          LOG.info("Cache: " + cacheSize + " order: " + order);
+          BoundaryCache cache = cacheSize == null ? null : new BoundaryCache(cacheSize);
+          Pair<PTFPartition, ValueBoundaryScanner> mocks = setupMocks(partition,
+                  ORDER_BY_COL, startBoundary, endBoundary, order, cache, readCounter);
+          PTFPartition ptfPartition = mocks.getLeft();
+          ValueBoundaryScanner scanner = mocks.getRight();
+          for (int i = 0; i < partition.size(); ++i) {
+            scanner.handleCache(i, ptfPartition);
+
+            scanner.enableBinarySearch = false;
+            int startWithoutBinaryPreSearch = scanner.computeStart(i, ptfPartition);
+            int endWithoutBinaryPreSearch = scanner.computeEnd(i, ptfPartition) - 1;
+
+            scanner.enableBinarySearch = true;
+            int start = scanner.computeStart(i, ptfPartition);
+            int end = scanner.computeEnd(i, ptfPartition) - 1;
+
+            Integer col0 = ofNullable(partition.get(i).get(0)).map(v -> v.get()).orElse(null);
+            Integer col1 = ofNullable(partition.get(i).get(1)).map(v -> v.get()).orElse(null);
+            Integer col2 = ofNullable(partition.get(i).get(2)).map(v -> v.get()).orElse(null);
+            LOG.info(String.format("%d|\t%d\t%d\t%d\t|%d-%d", i, col0, col1, col2, start, end));
+
+            assertEquals("start ranges are not equal with/without binary pre-search",
+                startWithoutBinaryPreSearch, start);
+            assertEquals("end ranges are not equal with/without binary pre-search",
+                endWithoutBinaryPreSearch, end);
+
+            if (cache == null) {
+              //Cache-less version should be baseline
+              expectedBoundaryStarts[i] = start;
+              expectedBoundaryEnds[i] = end;
+            } else {
+              assertEquals(
+                  String.format("expected boundary start doesn't match, order: %s, cache size: %d, i: %d",
+                      order, cacheSize, i),
+                  expectedBoundaryStarts[i], start);
+              assertEquals(
+                  String.format("expected boundary end doesn't match, order: %s, cache size: %d, i: %d",
+                      order, cacheSize, i),
+                  expectedBoundaryEnds[i], end);
+            }
+          }
           if (cache == null) {
-            //Cache-less version should be baseline
-            expectedBoundaryStarts[i] = start;
-            expectedBoundaryEnds[i] = end;
+            expectedReadCountWithoutCache = readCounter.get();
           } else {
-            assertEquals(expectedBoundaryStarts[i], start);
-            assertEquals(expectedBoundaryEnds[i], end);
-          }
-          Integer col0 = ofNullable(TEST_PARTITION.get(i).get(0)).map(v -> v.get()).orElse(null);
-          Integer col1 = ofNullable(TEST_PARTITION.get(i).get(1)).map(v -> v.get()).orElse(null);
-          Integer col2 = ofNullable(TEST_PARTITION.get(i).get(2)).map(v -> v.get()).orElse(null);
-          LOG.info(String.format("%d|\t%d\t%d\t%d\t|%d-%d", i, col0, col1, col2, start, end));
-        }
-        if (cache == null) {
-          expectedReadCountWithoutCache = readCounter.get();
-        } else {
-          //Read count should be smaller with cache being used, but larger than the minimum of
-          // reading every row once.
-          assertTrue(expectedReadCountWithoutCache >= readCounter.get());
-          if (startAmount != UNBOUNDED_AMOUNT || endAmount != UNBOUNDED_AMOUNT) {
-            assertTrue(TEST_PARTITION.size() <= readCounter.get());
+            //Read count should be smaller with cache being used, but larger than the minimum of
+            // reading every row once.
+            assertTrue(expectedReadCountWithoutCache >= readCounter.get());
+            if (startAmount != UNBOUNDED_AMOUNT || endAmount != UNBOUNDED_AMOUNT) {
+              assertTrue(partition.size() <= readCounter.get());
+            }
           }
+          readCounter.set(0);
         }
-        readCounter.set(0);
       }
     }
   }
@@ -222,7 +247,7 @@ public class TestBoundaryCache {
    * @throws Exception
    */
   private static Pair<PTFPartition, ValueBoundaryScanner> setupMocks(
-          List<List<IntWritable>> partition, int orderByCol, BoundaryDef start, BoundaryDef end,
+          LinkedList<List<IntWritable>> partition, int orderByCol, BoundaryDef start, BoundaryDef end,
           PTFInvocationSpec.Order order, BoundaryCache cache,
           AtomicInteger readCounter) throws Exception {
     PTFPartition partitionMock = mock(PTFPartition.class);
@@ -257,38 +282,41 @@ public class TestBoundaryCache {
       return (v1 != null && v2 != null) ? (v1.get() - v2.get()) > amt :  v1 != null || v2 != null;
     }).when(scannerSpy).isDistanceGreater(any(Object.class), any(Object.class), any(Integer.class));
 
-    setOrderOnTestPartitions(order);
+    setOrderOnTestPartitions(partition, order);
     return new ImmutablePair<>(partitionMock, scannerSpy);
 
   }
 
-  private static void addRow(List<List<IntWritable>> partition, Integer col0, Integer col1,
-                             Integer col2) {
-    partition.add(Lists.newArrayList(
-            col0 != null ? new IntWritable(col0) : null,
-            col1 != null ? new IntWritable(col1) : null,
-            col2 != null ? new IntWritable(col2) : null
-    ));
+  @SuppressWarnings("unchecked")
+  private static void addRow(Integer col0, Integer col1, Integer col2,
+      List<List<IntWritable>>... partitions) {
+    for (List<List<IntWritable>> partition : partitions) {
+      partition.add(Lists.newArrayList(col0 != null ? new IntWritable(col0) : null,
+          col1 != null ? new IntWritable(col1) : null,
+          col2 != null ? new IntWritable(col2) : null));
+    }
+
   }
 
   /**
    * Reverses order on actual data if needed, based on order parameter.
    * @param order
    */
-  private static void setOrderOnTestPartitions(PTFInvocationSpec.Order order) {
-    LinkedList<List<IntWritable>> notNulls = TEST_PARTITION.stream().filter(
-        r -> r.get(ORDER_BY_COL) != null).collect(toCollection(LinkedList::new));
-    List<List<IntWritable>> nulls = TEST_PARTITION.stream().filter(
-        r -> r.get(ORDER_BY_COL) == null).collect(toList());
+  private static void setOrderOnTestPartitions(LinkedList<List<IntWritable>> partition,
+      PTFInvocationSpec.Order order) {
+    LinkedList<List<IntWritable>> notNulls = partition.stream()
+        .filter(r -> r.get(ORDER_BY_COL) != null).collect(toCollection(LinkedList::new));
+    List<List<IntWritable>> nulls =
+        partition.stream().filter(r -> r.get(ORDER_BY_COL) == null).collect(toList());
 
-    boolean isAscCurrently = notNulls.getFirst().get(ORDER_BY_COL).get() <
-            notNulls.getLast().get(ORDER_BY_COL).get();
+    boolean isAscCurrently =
+        notNulls.getFirst().get(ORDER_BY_COL).get() < notNulls.getLast().get(ORDER_BY_COL).get();
 
     if ((ASC.equals(order) && !isAscCurrently) || (DESC.equals(order) && isAscCurrently)) {
       Collections.reverse(notNulls);
-      TEST_PARTITION.clear();
-      TEST_PARTITION.addAll(notNulls);
-      TEST_PARTITION.addAll(nulls);
+      partition.clear();
+      partition.addAll(notNulls);
+      partition.addAll(nulls);
     }
   }