You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ab...@apache.org on 2021/08/05 07:56:25 UTC
[hive] branch master updated: HIVE-25061: PTF: Improve
ValueBoundaryScanner (#2225) (Laszlo Bodor reviewed by Panagiotis
Garefalakis)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 2db4fd5 HIVE-25061: PTF: Improve ValueBoundaryScanner (#2225) (Laszlo Bodor reviewed by Panagiotis Garefalakis)
2db4fd5 is described below
commit 2db4fd5cc1a7a03654d6713d605bc6c7bd4eb6f0
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Thu Aug 5 09:56:17 2021 +0200
HIVE-25061: PTF: Improve ValueBoundaryScanner (#2225) (Laszlo Bodor reviewed by Panagiotis Garefalakis)
---
.../hive/ql/udf/ptf/ValueBoundaryScanner.java | 184 ++++++++++++--------
.../hadoop/hive/ql/udf/ptf/TestBoundaryCache.java | 192 ++++++++++++---------
2 files changed, 220 insertions(+), 156 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/ValueBoundaryScanner.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/ValueBoundaryScanner.java
index b76af63..c6bb1c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/ValueBoundaryScanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/ValueBoundaryScanner.java
@@ -42,10 +42,24 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import com.google.common.annotations.VisibleForTesting;
+
+
public abstract class ValueBoundaryScanner {
BoundaryDef start, end;
protected final boolean nullsLast;
+ /*
+ * The parameter enableBinarySearch can be configurable sometime later, because in edge cases
+ * (very small windows compared to ptf partition size) a linear search could lead to slightly
+ * better performance, but not significantly. Currently, propagating the option from PTF codepath
+ * would involve lots of function signature changes, so it isn't worth. This boolean still can be
+ * used for testing purposes (e.g. comparing the linear and binary search results to make sure
+ * about correctness).
+ */
+ @VisibleForTesting
+ boolean enableBinarySearch = true;
+
public ValueBoundaryScanner(BoundaryDef start, BoundaryDef end, boolean nullsLast) {
this.start = start;
this.end = end;
@@ -238,6 +252,7 @@ public abstract class ValueBoundaryScanner {
rowVal = computeValue(p.getAt(r));
}
}
+
return new ImmutablePair<>(r, rowVal);
}
@@ -403,26 +418,11 @@ abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner {
}
}
- Object rowVal = sortKey;
- int r = rowIdx;
-
- // Use Case 4.
- if ( expressionDef.getOrder() == Order.DESC ) {
- while (r >= 0 && !isDistanceGreater(rowVal, sortKey, amt) ) {
- Pair<Integer, Object> stepResult = skipOrStepBack(r, p);
- r = stepResult.getLeft();
- rowVal = stepResult.getRight();
- }
- return r + 1;
- }
- else { // Use Case 5.
- while (r >= 0 && !isDistanceGreater(sortKey, rowVal, amt) ) {
- Pair<Integer, Object> stepResult = skipOrStepBack(r, p);
- r = stepResult.getLeft();
- rowVal = stepResult.getRight();
- }
-
- return r + 1;
+ // Use Case 4,5
+ if (enableBinarySearch) {
+ return binarySearchBack(rowIdx, p, sortKey, amt, expressionDef.getOrder());
+ } else {
+ return linearSearchBack(rowIdx, p, sortKey, amt, expressionDef.getOrder());
}
}
@@ -456,7 +456,6 @@ abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner {
Object sortKey = computeValueUseCache(rowIdx, p);
Object rowVal = sortKey;
- int r = rowIdx;
if ( sortKey == null ) {
// Use Case 9.
@@ -464,31 +463,20 @@ abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner {
return p.size();
}
else { // Use Case 10.
- while (r < p.size() && rowVal == null ) {
- Pair<Integer, Object> stepResult = skipOrStepForward(r, p);
- r = stepResult.getLeft();
+ while (rowIdx < p.size() && rowVal == null ) {
+ Pair<Integer, Object> stepResult = skipOrStepForward(rowIdx, p);
+ rowIdx = stepResult.getLeft();
rowVal = stepResult.getRight();
}
- return r;
+ return rowIdx;
}
}
- // Use Case 11.
- if ( expressionDef.getOrder() == Order.DESC) {
- while (r < p.size() && !isDistanceGreater(sortKey, rowVal, amt) ) {
- Pair<Integer, Object> stepResult = skipOrStepForward(r, p);
- r = stepResult.getLeft();
- rowVal = stepResult.getRight();
- }
- return r;
- }
- else { // Use Case 12.
- while (r < p.size() && !isDistanceGreater(rowVal, sortKey, amt) ) {
- Pair<Integer, Object> stepResult = skipOrStepForward(r, p);
- r = stepResult.getLeft();
- rowVal = stepResult.getRight();
- }
- return r;
+ // Use Case 11,12
+ if (enableBinarySearch) {
+ return binarySearchForward(rowIdx, p, sortKey, amt, expressionDef.getOrder());
+ } else {
+ return linearSearchForward(rowIdx, p, sortKey, amt, expressionDef.getOrder());
}
}
@@ -554,25 +542,13 @@ abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner {
}
}
- Object rowVal = sortKey;
int r = rowIdx;
- // Use Case 4.
- if ( expressionDef.getOrder() == Order.DESC ) {
- while (r >= 0 && !isDistanceGreater(rowVal, sortKey, amt) ) {
- Pair<Integer, Object> stepResult = skipOrStepBack(r, p);
- r = stepResult.getLeft();
- rowVal = stepResult.getRight();
- }
- return r + 1;
- }
- else { // Use Case 5.
- while (r >= 0 && !isDistanceGreater(sortKey, rowVal, amt) ) {
- Pair<Integer, Object> stepResult = skipOrStepBack(r, p);
- r = stepResult.getLeft();
- rowVal = stepResult.getRight();
- }
- return r + 1;
+ // Use Case 4,5
+ if (enableBinarySearch) {
+ return binarySearchBack(r, p, sortKey, amt, expressionDef.getOrder());
+ } else {
+ return linearSearchBack(r, p, sortKey, amt, expressionDef.getOrder());
}
}
@@ -628,22 +604,11 @@ abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner {
}
}
- // Use Case 11.
- if ( expressionDef.getOrder() == Order.DESC) {
- while (r < p.size() && !isDistanceGreater(sortKey, rowVal, amt) ) {
- Pair<Integer, Object> stepResult = skipOrStepForward(r, p);
- r = stepResult.getLeft();
- rowVal = stepResult.getRight();
- }
- return r;
- }
- else { // Use Case 12.
- while (r < p.size() && !isDistanceGreater(rowVal, sortKey, amt) ) {
- Pair<Integer, Object> stepResult = skipOrStepForward(r, p);
- r = stepResult.getLeft();
- rowVal = stepResult.getRight();
- }
- return r;
+ // Use Case 11,12
+ if (enableBinarySearch) {
+ return binarySearchForward(r, p, sortKey, amt, expressionDef.getOrder());
+ } else {
+ return linearSearchForward(r, p, sortKey, amt, expressionDef.getOrder());
}
}
@@ -664,6 +629,77 @@ abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner {
*/
public abstract boolean isEqual(Object v1, Object v2);
+ protected int binarySearchBack(int rowId, PTFPartition p, Object sortKey, int amt,
+ Order order) throws HiveException {
+ boolean isOrderDesc = order.equals(Order.DESC);
+ Object rowVal = null;
+
+ int rMin = 0; // tracks lowest possible number fulfilling the range requirement
+ int rMax = rowId; // tracks highest possible number fulfilling the range requirement
+
+ boolean isDistanceGreater = true;
+ while (rMin < rMax) {
+ rowVal = computeValueUseCache(rowId, p);
+ isDistanceGreater = isDistanceGreater(isOrderDesc ? rowVal : sortKey, isOrderDesc ? sortKey : rowVal, amt);
+ if (isDistanceGreater) {
+ rMin = rowId + 1;
+ } else {
+ rMax = rowId;
+ }
+ rowId = rMin + (rMax - rMin) / 2;
+ }
+ return rMin;
+ }
+
+ private int linearSearchBack(int r, PTFPartition p, Object sortKey, int amt,
+ Order order) throws HiveException {
+ boolean isOrderDesc = order.equals(Order.DESC);
+ Object rowVal = sortKey;
+ while (r >= 0 && !isDistanceGreater(isOrderDesc ? rowVal : sortKey,
+ isOrderDesc ? sortKey : rowVal, amt)) {
+ Pair<Integer, Object> stepResult = skipOrStepBack(r, p);
+ r = stepResult.getLeft();
+ rowVal = stepResult.getRight();
+ }
+ return r + 1;
+ }
+
+ protected int binarySearchForward(int rowId, PTFPartition p, Object sortKey, int amt,
+ Order order) throws HiveException {
+ boolean isOrderDesc = order.equals(Order.DESC);
+ Object rowVal = null;
+
+ int rMin = rowId; // tracks lowest possible number fulfilling the range requirement
+ int rMax = p.size(); // tracks highest possible number fulfilling the range requirement
+
+ boolean isDistanceGreater = true;
+ while (rMin < rMax) {
+ rowVal = computeValueUseCache(rowId, p);
+ isDistanceGreater =
+ isDistanceGreater(isOrderDesc ? sortKey : rowVal, isOrderDesc ? rowVal : sortKey, amt);
+ if (isDistanceGreater) {
+ rMax = rowId;
+ } else {
+ rMin = rowId + 1;
+ }
+ rowId = rMin + (rMax - rMin) / 2;
+ }
+ return rMin;
+ }
+
+ private int linearSearchForward(int r, PTFPartition p, Object sortKey, int amt,
+ Order order) throws HiveException {
+ boolean isOrderDesc = order.equals(Order.DESC);
+ Object rowVal = sortKey;
+ while (r < p.size() && !isDistanceGreater(isOrderDesc ? sortKey : rowVal,
+ isOrderDesc ? rowVal : sortKey, amt)) {
+ Pair<Integer, Object> stepResult = skipOrStepForward(r, p);
+ r = stepResult.getLeft();
+ rowVal = stepResult.getRight();
+ }
+ return r;
+ }
+
public static SingleValueBoundaryScanner getScanner(BoundaryDef start, BoundaryDef end,
OrderDef orderDef, boolean nullsLast) throws HiveException {
if (orderDef.getExpressions().size() != 1) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/udf/ptf/TestBoundaryCache.java b/ql/src/test/org/apache/hadoop/hive/ql/udf/ptf/TestBoundaryCache.java
index 5bd1ca4..bbf60d3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/udf/ptf/TestBoundaryCache.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/udf/ptf/TestBoundaryCache.java
@@ -64,37 +64,41 @@ public class TestBoundaryCache {
private static final Logger LOG = LoggerFactory.getLogger(TestBoundaryCache.class);
private static final LinkedList<List<IntWritable>> TEST_PARTITION = new LinkedList<>();
+ private static final LinkedList<List<IntWritable>> TEST_PARTITION_NO_NULLS = new LinkedList<>();
+ @SuppressWarnings("unchecked")
+ private static final List<LinkedList<List<IntWritable>>> TEST_PARTITIONS =
+ Lists.newArrayList(TEST_PARTITION, TEST_PARTITION_NO_NULLS);
//Null for using no cache at all, 2 is minimum cache length, 5-9-15 for checking with smaller,
// exactly equal and larger cache than needed.
private static final List<Integer> CACHE_SIZES = Lists.newArrayList(null, 2, 5, 9, 15);
private static final List<PTFInvocationSpec.Order> ORDERS = Lists.newArrayList(ASC, DESC);
private static final int ORDER_BY_COL = 2;
+ @SuppressWarnings("unchecked")
@BeforeClass
public static void setupTests() throws Exception {
//8 ranges, max cache content is 8+1=9 entries
- addRow(TEST_PARTITION, 1, 1, -7);
- addRow(TEST_PARTITION, 2, 1, -1);
- addRow(TEST_PARTITION, 3, 1, -1);
- addRow(TEST_PARTITION, 4, 1, 1);
- addRow(TEST_PARTITION, 5, 1, 1);
- addRow(TEST_PARTITION, 6, 1, 1);
- addRow(TEST_PARTITION, 7, 1, 1);
- addRow(TEST_PARTITION, 8, 1, 2);
- addRow(TEST_PARTITION, 9, 1, 2);
- addRow(TEST_PARTITION, 10, 1, 2);
- addRow(TEST_PARTITION, 11, 1, 2);
- addRow(TEST_PARTITION, 12, 1, 3);
- addRow(TEST_PARTITION, 13, 1, 5);
- addRow(TEST_PARTITION, 14, 1, 5);
- addRow(TEST_PARTITION, 15, 1, 5);
- addRow(TEST_PARTITION, 16, 1, 5);
- addRow(TEST_PARTITION, 17, 1, 6);
- addRow(TEST_PARTITION, 18, 1, 6);
- addRow(TEST_PARTITION, 19, 1, 9);
- addRow(TEST_PARTITION, 20, 1, null);
- addRow(TEST_PARTITION, 21, 1, null);
-
+ addRow(1, 1, -7, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(2, 1, -1, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(3, 1, -1, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(4, 1, 1, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(5, 1, 1, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(6, 1, 1, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(7, 1, 1, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(8, 1, 2, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(9, 1, 2, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(10, 1, 2, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(11, 1, 2, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(12, 1, 3, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(13, 1, 5, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(14, 1, 5, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(15, 1, 5, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(16, 1, 5, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(17, 1, 6, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(18, 1, 6, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(19, 1, 9, TEST_PARTITION, TEST_PARTITION_NO_NULLS);
+ addRow(20, 1, null, TEST_PARTITION);
+ addRow(21, 1, null, TEST_PARTITION);
}
@Test
@@ -160,51 +164,72 @@ public class TestBoundaryCache {
private void runTest(WindowingSpec.Direction startDirection, int startAmount,
WindowingSpec.Direction endDirection, int endAmount) throws Exception {
- BoundaryDef startBoundary = new BoundaryDef(startDirection, startAmount);
- BoundaryDef endBoundary = new BoundaryDef(endDirection, endAmount);
- AtomicInteger readCounter = new AtomicInteger(0);
-
- int[] expectedBoundaryStarts = new int[TEST_PARTITION.size()];
- int[] expectedBoundaryEnds = new int[TEST_PARTITION.size()];
- int expectedReadCountWithoutCache = -1;
-
- for (PTFInvocationSpec.Order order : ORDERS) {
- for (Integer cacheSize : CACHE_SIZES) {
- LOG.info(Thread.currentThread().getStackTrace()[2].getMethodName());
- LOG.info("Cache: " + cacheSize + " order: " + order);
- BoundaryCache cache = cacheSize == null ? null : new BoundaryCache(cacheSize);
- Pair<PTFPartition, ValueBoundaryScanner> mocks = setupMocks(TEST_PARTITION,
- ORDER_BY_COL, startBoundary, endBoundary, order, cache, readCounter);
- PTFPartition ptfPartition = mocks.getLeft();
- ValueBoundaryScanner scanner = mocks.getRight();
- for (int i = 0; i < TEST_PARTITION.size(); ++i) {
- scanner.handleCache(i, ptfPartition);
- int start = scanner.computeStart(i, ptfPartition);
- int end = scanner.computeEnd(i, ptfPartition) - 1;
+ for (LinkedList<List<IntWritable>> partition : TEST_PARTITIONS){
+ BoundaryDef startBoundary = new BoundaryDef(startDirection, startAmount);
+ BoundaryDef endBoundary = new BoundaryDef(endDirection, endAmount);
+ AtomicInteger readCounter = new AtomicInteger(0);
+
+ int[] expectedBoundaryStarts = new int[partition.size()];
+ int[] expectedBoundaryEnds = new int[partition.size()];
+ int expectedReadCountWithoutCache = -1;
+
+ for (PTFInvocationSpec.Order order : ORDERS) {
+ for (Integer cacheSize : CACHE_SIZES) {
+ LOG.info(Thread.currentThread().getStackTrace()[2].getMethodName());
+ LOG.info("Cache: " + cacheSize + " order: " + order);
+ BoundaryCache cache = cacheSize == null ? null : new BoundaryCache(cacheSize);
+ Pair<PTFPartition, ValueBoundaryScanner> mocks = setupMocks(partition,
+ ORDER_BY_COL, startBoundary, endBoundary, order, cache, readCounter);
+ PTFPartition ptfPartition = mocks.getLeft();
+ ValueBoundaryScanner scanner = mocks.getRight();
+ for (int i = 0; i < partition.size(); ++i) {
+ scanner.handleCache(i, ptfPartition);
+
+ scanner.enableBinarySearch = false;
+ int startWithoutBinaryPreSearch = scanner.computeStart(i, ptfPartition);
+ int endWithoutBinaryPreSearch = scanner.computeEnd(i, ptfPartition) - 1;
+
+ scanner.enableBinarySearch = true;
+ int start = scanner.computeStart(i, ptfPartition);
+ int end = scanner.computeEnd(i, ptfPartition) - 1;
+
+ Integer col0 = ofNullable(partition.get(i).get(0)).map(v -> v.get()).orElse(null);
+ Integer col1 = ofNullable(partition.get(i).get(1)).map(v -> v.get()).orElse(null);
+ Integer col2 = ofNullable(partition.get(i).get(2)).map(v -> v.get()).orElse(null);
+ LOG.info(String.format("%d|\t%d\t%d\t%d\t|%d-%d", i, col0, col1, col2, start, end));
+
+ assertEquals("start ranges are not equal with/without binary pre-search",
+ startWithoutBinaryPreSearch, start);
+ assertEquals("end ranges are not equal with/without binary pre-search",
+ endWithoutBinaryPreSearch, end);
+
+ if (cache == null) {
+ //Cache-less version should be baseline
+ expectedBoundaryStarts[i] = start;
+ expectedBoundaryEnds[i] = end;
+ } else {
+ assertEquals(
+ String.format("expected boundary start doesn't match, order: %s, cache size: %d, i: %d",
+ order, cacheSize, i),
+ expectedBoundaryStarts[i], start);
+ assertEquals(
+ String.format("expected boundary end doesn't match, order: %s, cache size: %d, i: %d",
+ order, cacheSize, i),
+ expectedBoundaryEnds[i], end);
+ }
+ }
if (cache == null) {
- //Cache-less version should be baseline
- expectedBoundaryStarts[i] = start;
- expectedBoundaryEnds[i] = end;
+ expectedReadCountWithoutCache = readCounter.get();
} else {
- assertEquals(expectedBoundaryStarts[i], start);
- assertEquals(expectedBoundaryEnds[i], end);
- }
- Integer col0 = ofNullable(TEST_PARTITION.get(i).get(0)).map(v -> v.get()).orElse(null);
- Integer col1 = ofNullable(TEST_PARTITION.get(i).get(1)).map(v -> v.get()).orElse(null);
- Integer col2 = ofNullable(TEST_PARTITION.get(i).get(2)).map(v -> v.get()).orElse(null);
- LOG.info(String.format("%d|\t%d\t%d\t%d\t|%d-%d", i, col0, col1, col2, start, end));
- }
- if (cache == null) {
- expectedReadCountWithoutCache = readCounter.get();
- } else {
- //Read count should be smaller with cache being used, but larger than the minimum of
- // reading every row once.
- assertTrue(expectedReadCountWithoutCache >= readCounter.get());
- if (startAmount != UNBOUNDED_AMOUNT || endAmount != UNBOUNDED_AMOUNT) {
- assertTrue(TEST_PARTITION.size() <= readCounter.get());
+ //Read count should be smaller with cache being used, but larger than the minimum of
+ // reading every row once.
+ assertTrue(expectedReadCountWithoutCache >= readCounter.get());
+ if (startAmount != UNBOUNDED_AMOUNT || endAmount != UNBOUNDED_AMOUNT) {
+ assertTrue(partition.size() <= readCounter.get());
+ }
}
+ readCounter.set(0);
}
- readCounter.set(0);
}
}
}
@@ -222,7 +247,7 @@ public class TestBoundaryCache {
* @throws Exception
*/
private static Pair<PTFPartition, ValueBoundaryScanner> setupMocks(
- List<List<IntWritable>> partition, int orderByCol, BoundaryDef start, BoundaryDef end,
+ LinkedList<List<IntWritable>> partition, int orderByCol, BoundaryDef start, BoundaryDef end,
PTFInvocationSpec.Order order, BoundaryCache cache,
AtomicInteger readCounter) throws Exception {
PTFPartition partitionMock = mock(PTFPartition.class);
@@ -257,38 +282,41 @@ public class TestBoundaryCache {
return (v1 != null && v2 != null) ? (v1.get() - v2.get()) > amt : v1 != null || v2 != null;
}).when(scannerSpy).isDistanceGreater(any(Object.class), any(Object.class), any(Integer.class));
- setOrderOnTestPartitions(order);
+ setOrderOnTestPartitions(partition, order);
return new ImmutablePair<>(partitionMock, scannerSpy);
}
- private static void addRow(List<List<IntWritable>> partition, Integer col0, Integer col1,
- Integer col2) {
- partition.add(Lists.newArrayList(
- col0 != null ? new IntWritable(col0) : null,
- col1 != null ? new IntWritable(col1) : null,
- col2 != null ? new IntWritable(col2) : null
- ));
+ @SuppressWarnings("unchecked")
+ private static void addRow(Integer col0, Integer col1, Integer col2,
+ List<List<IntWritable>>... partitions) {
+ for (List<List<IntWritable>> partition : partitions) {
+ partition.add(Lists.newArrayList(col0 != null ? new IntWritable(col0) : null,
+ col1 != null ? new IntWritable(col1) : null,
+ col2 != null ? new IntWritable(col2) : null));
+ }
+
}
/**
* Reverses order on actual data if needed, based on order parameter.
* @param order
*/
- private static void setOrderOnTestPartitions(PTFInvocationSpec.Order order) {
- LinkedList<List<IntWritable>> notNulls = TEST_PARTITION.stream().filter(
- r -> r.get(ORDER_BY_COL) != null).collect(toCollection(LinkedList::new));
- List<List<IntWritable>> nulls = TEST_PARTITION.stream().filter(
- r -> r.get(ORDER_BY_COL) == null).collect(toList());
+ private static void setOrderOnTestPartitions(LinkedList<List<IntWritable>> partition,
+ PTFInvocationSpec.Order order) {
+ LinkedList<List<IntWritable>> notNulls = partition.stream()
+ .filter(r -> r.get(ORDER_BY_COL) != null).collect(toCollection(LinkedList::new));
+ List<List<IntWritable>> nulls =
+ partition.stream().filter(r -> r.get(ORDER_BY_COL) == null).collect(toList());
- boolean isAscCurrently = notNulls.getFirst().get(ORDER_BY_COL).get() <
- notNulls.getLast().get(ORDER_BY_COL).get();
+ boolean isAscCurrently =
+ notNulls.getFirst().get(ORDER_BY_COL).get() < notNulls.getLast().get(ORDER_BY_COL).get();
if ((ASC.equals(order) && !isAscCurrently) || (DESC.equals(order) && isAscCurrently)) {
Collections.reverse(notNulls);
- TEST_PARTITION.clear();
- TEST_PARTITION.addAll(notNulls);
- TEST_PARTITION.addAll(nulls);
+ partition.clear();
+ partition.addAll(notNulls);
+ partition.addAll(nulls);
}
}