You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/06/27 06:19:28 UTC
[drill] branch master updated: DRILL-6503: Performance improvements
in lateral
This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 779edf8 DRILL-6503: Performance improvements in lateral
779edf8 is described below
commit 779edf880a1e92608b68108f18e79eff6eb4afa5
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Tue Jun 26 23:19:25 2018 -0700
DRILL-6503: Performance improvements in lateral
closes #1328
---
.../drill/exec/physical/config/LateralJoinPOP.java | 4 +
.../exec/physical/impl/join/LateralJoinBatch.java | 162 ++++++++++++++-------
.../impl/join/TestLateralJoinCorrectness.java | 16 +-
.../unnest/TestUnnestWithLateralCorrectness.java | 6 +-
4 files changed, 121 insertions(+), 67 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
index fab89a2..a12fed1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
@@ -43,6 +43,10 @@ public class LateralJoinPOP extends AbstractJoinPop {
@JsonProperty("right") PhysicalOperator right,
@JsonProperty("joinType") JoinRelType joinType) {
super(left, right, joinType, null, null);
+ Preconditions.checkArgument(joinType != JoinRelType.FULL,
+ "Full outer join is currently not supported with Lateral Join");
+ Preconditions.checkArgument(joinType != JoinRelType.RIGHT,
+ "Right join is currently not supported with Lateral Join");
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index a09913f..578cbc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -52,11 +52,6 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class);
- // Input indexes to correctly update the stats
- private static final int LEFT_INPUT = 0;
-
- private static final int RIGHT_INPUT = 1;
-
// Maximum number records in the outgoing batch
private int maxOutputRowCount;
@@ -81,8 +76,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Keep track if any matching right record was found for current left index record
private boolean matchedRecordFound;
+ // Used only for testing
private boolean useMemoryManager = true;
+ // Flag to keep track of new left batch so that update on memory manager is called only once per left batch
+ private boolean isNewLeftBatch = false;
+
/* ****************************************************************************************************************
* Public Methods
* ****************************************************************************************************************/
@@ -147,9 +146,16 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Setup the references of left, right and outgoing container in generated operator
state = BatchState.NOT_FIRST;
- // Update the memory manager
- updateMemoryManager(LEFT_INPUT);
- updateMemoryManager(RIGHT_INPUT);
+ // Update the memory manager only if its a brand new incoming i.e. leftJoinIndex and rightJoinIndex is 0
+ // Otherwise there will be a case where while filling last output batch, some records from previous left or
+ // right batch are still left to be sent in output for which we will count this batch twice. The actual checks
+ // are done in updateMemoryManager
+ updateMemoryManager(LEFT_INDEX);
+
+ // We have to call update on memory manager for empty batches (rightJoinIndex = -1) as well since other wise while
+ // allocating memory for vectors below it can fail. Since in that case colSize will not have any info on right side
+ // vectors and throws NPE. The actual checks are done in updateMemoryManager
+ updateMemoryManager(RIGHT_INDEX);
// allocate space for the outgoing batch
allocateVectors();
@@ -161,21 +167,25 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
public void close() {
updateBatchMemoryManagerStats();
- logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
- batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
- batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
- batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
- logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
- batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
- batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
- batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
- logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
- batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
+ if (logger.isDebugEnabled()) {
+ logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, " +
+ "record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+ batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+ batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+ batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+ logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, " +
+ "record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+ batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+ batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+ batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+ logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, " +
+ "record count : {}", batchMemoryManager.getNumOutgoingBatches(),
+ batchMemoryManager.getAvgOutputBatchSize(),
+ batchMemoryManager.getAvgOutputRowWidth(),
+ batchMemoryManager.getTotalOutputRecords());
+ }
super.close();
}
@@ -238,6 +248,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
boolean validBatch = setBatchState(leftUpstream);
if (validBatch) {
+ isNewLeftBatch = true;
rightUpstream = next(1, right);
validBatch = setBatchState(rightUpstream);
}
@@ -266,10 +277,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
}
Preconditions.checkState(right.getRecordCount() == 0, "Unexpected non-empty first right batch received");
- // Update the record memory manager
- updateMemoryManager(LEFT_INPUT);
- updateMemoryManager(RIGHT_INPUT);
-
// Setup output container schema based on known left and right schema
setupNewSchema();
@@ -337,7 +344,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// If left batch is empty
while (needLeftBatch) {
- leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, left) : leftUpstream;
+
+ if (!processLeftBatchInFuture) {
+ leftUpstream = next(LEFT_INDEX, left);
+ isNewLeftBatch = true;
+ }
+
final boolean emptyLeftBatch = left.getRecordCount() <=0;
logger.trace("Received a left batch and isEmpty: {}", emptyLeftBatch);
@@ -418,7 +430,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// will be a valid index. When all records are consumed it will be set to -1.
boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex == -1);
while (needNewRightBatch) {
- rightUpstream = next(RIGHT_INPUT, right);
+ rightUpstream = next(RIGHT_INDEX, right);
switch (rightUpstream) {
case OK_NEW_SCHEMA:
// We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a
@@ -503,7 +515,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
if (rightUpstream == EMIT) {
if (!matchedRecordFound && JoinRelType.LEFT == popConfig.getJoinType()) {
// copy left side in case of LEFT join
- emitLeft(leftJoinIndex, outputIndex++);
+ emitLeft(leftJoinIndex, outputIndex, 1);
+ ++outputIndex;
}
++leftJoinIndex;
// Reset matchedRecord for next left index record
@@ -557,7 +570,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
}
// Update the batch memory manager to use new left incoming batch
- updateMemoryManager(LEFT_INPUT);
+ updateMemoryManager(LEFT_INDEX);
}
}
@@ -577,7 +590,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
}
// Update the batch memory manager to use new right incoming batch
- updateMemoryManager(RIGHT_INPUT);
+ updateMemoryManager(RIGHT_INDEX);
}
} // output batch is full to its max capacity
@@ -615,9 +628,11 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
batchMemoryManager.updateOutgoingStats(outputIndex);
- logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
- logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", outputIndex,
- container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation());
+ if (logger.isDebugEnabled()) {
+ logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+ logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]",
+ outputIndex, container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation());
+ }
// Update the output index for next output batch to zero
outputIndex = 0;
@@ -745,8 +760,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
* this left index. The right container is copied starting from rightIndex until number of records in the container.
*/
private void crossJoinAndOutputRecords() {
- logger.trace("Producing output for leftIndex: {}, rightIndex: {}, rightRecordCount: {} and outputIndex: {}",
- leftJoinIndex, rightJoinIndex, right.getRecordCount(), outputIndex);
final int rightRecordCount = right.getRecordCount();
// If there is no record in right batch just return current index in output batch
@@ -756,16 +769,30 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Check if right batch is empty since we have to handle left join case
Preconditions.checkState(rightJoinIndex != -1, "Right batch record count is >0 but index is -1");
- // For every record in right side just emit left and right records in output container
- for (int i = rightJoinIndex; i < rightRecordCount; ++i) {
- emitLeft(leftJoinIndex, outputIndex);
- emitRight(i, outputIndex);
- ++outputIndex;
-
- if (isOutgoingBatchFull()) {
- break;
- }
+
+ int currentOutIndex = outputIndex;
+ // Number of rows that can be copied in output batch
+ final int maxAvailableRowSlot = maxOutputRowCount - currentOutIndex;
+ // Number of rows that can be copied inside output batch is minimum of available slot in
+ // output batch and available data to copy from right side. It can be half consumed right batch
+ // which has few more rows to be copied to output but output batch has more to fill.
+ final int rowsToCopy = Math.min(maxAvailableRowSlot, (rightRecordCount - rightJoinIndex));
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Producing output for leftIndex: {}, rightIndex: {}, rightRecordCount: {}, outputIndex: {} and " +
+ "availableSlotInOutput: {}", leftJoinIndex, rightJoinIndex, rightRecordCount, outputIndex, maxAvailableRowSlot);
+ logger.debug("Output Batch stats before copying new data: {}", new RecordBatchSizer(this));
}
+
+ // First copy all the left vectors data. Doing it in this way since it's the same data being copied over may be
+ // we will have performance gain from JVM
+ emitLeft(leftJoinIndex, currentOutIndex, rowsToCopy);
+
+ // Copy all the right side vectors data
+ emitRight(rightJoinIndex, currentOutIndex, rowsToCopy);
+
+ // Update outputIndex
+ outputIndex += rowsToCopy;
}
/**
@@ -779,9 +806,14 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
* @param startVectorIndex - start index of vector inside source record batch
* @param endVectorIndex - end index of vector inside source record batch
* @param baseVectorIndex - base index to be added to startVectorIndex to get corresponding vector in outgoing batch
+ * @param numRowsToCopy - Number of rows to copy into output batch
+ * @param moveFromIndex - boolean to indicate if the fromIndex should also be increased or not. Since in case of
+ * copying data from left vector fromIndex is constant whereas in case of copying data from right
+ * vector fromIndex will move along with output index.
*/
private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, RecordBatch batch,
- int startVectorIndex, int endVectorIndex, int baseVectorIndex) {
+ int startVectorIndex, int endVectorIndex, int baseVectorIndex,
+ int numRowsToCopy, boolean moveFromIndex) {
// Get the vectors using field index rather than Materialized field since input batch field can be different from
// output container field in case of Left Join. As we rebuild the right Schema field to be optional for output
// container.
@@ -796,10 +828,14 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector();
logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " +
- "(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and BaseIndex: {}]",
- fromRowIndex, inputValueClass, toRowIndex, outputValueClass, baseVectorIndex);
- // Copy data from input vector to output vector
- outputVector.copyEntry(toRowIndex, inputVector, fromRowIndex);
+ "(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and Other: (TimeEachValue: {}," +
+ " NumBaseIndex: {}) ]",
+ fromRowIndex, inputValueClass, toRowIndex, outputValueClass, numRowsToCopy, baseVectorIndex);
+
+ // Copy data from input vector to output vector for numRowsToCopy times.
+ for (int j = 0; j < numRowsToCopy; ++j) {
+ outputVector.copyEntry(toRowIndex + j, inputVector, (moveFromIndex) ? fromRowIndex + j : fromRowIndex);
+ }
}
}
@@ -809,8 +845,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
* @param leftIndex - index to copy data from left incoming batch vectors
* @param outIndex - index to copy data to in outgoing batch vectors
*/
- private void emitLeft(int leftIndex, int outIndex) {
- copyDataToOutputVectors(leftIndex, outIndex, left, 0, leftSchema.getFieldCount(), 0);
+ private void emitLeft(int leftIndex, int outIndex, int numRowsToCopy) {
+ copyDataToOutputVectors(leftIndex, outIndex, left, 0,
+ leftSchema.getFieldCount(), 0, numRowsToCopy, false);
}
/**
@@ -819,8 +856,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
* @param rightIndex - index to copy data from right incoming batch vectors
* @param outIndex - index to copy data to in outgoing batch vectors
*/
- private void emitRight(int rightIndex, int outIndex) {
- copyDataToOutputVectors(rightIndex, outIndex, right, 0, rightSchema.getFieldCount(), leftSchema.getFieldCount());
+ private void emitRight(int rightIndex, int outIndex, int numRowsToCopy) {
+ copyDataToOutputVectors(rightIndex, outIndex, right, 0,
+ rightSchema.getFieldCount(), leftSchema.getFieldCount(), numRowsToCopy, true);
}
/**
@@ -847,12 +885,24 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
}
private void updateMemoryManager(int inputIndex) {
+
+ if (inputIndex == LEFT_INDEX && isNewLeftBatch) {
+ // reset state and continue to update
+ isNewLeftBatch = false;
+ } else if (inputIndex == RIGHT_INDEX && (rightJoinIndex == 0 || rightJoinIndex == -1)) {
+ // continue to update
+ } else {
+ return;
+ }
+
// For cases where all the previous input were consumed and send with previous output batch. But now we are building
// a new output batch with new incoming then it will not cause any problem since outputIndex will be 0
final int newOutputRowCount = batchMemoryManager.update(inputIndex, outputIndex);
if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? "left" : "right", batchMemoryManager.getRecordBatchSizer(inputIndex));
+ logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == LEFT_INDEX ? "left" : "right",
+ batchMemoryManager.getRecordBatchSizer(inputIndex));
+ logger.debug("Previous OutputRowCount: {}, New OutputRowCount: {}", maxOutputRowCount, newOutputRowCount);
}
if (useMemoryManager) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index e9e9aac..79a7bd4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -107,7 +107,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
.buildSchema();
emptyRightRowSet = fixture.rowSetBuilder(rightSchema).build();
- ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.FULL);
+ ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER);
}
@AfterClass
@@ -1754,7 +1754,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
- final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
final LateralJoinBatch ljBatch_1 = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
leftMockBatch_1, rightMockBatch_1);
@@ -1863,7 +1863,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
- final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
leftMockBatch_1, rightMockBatch_1);
@@ -1964,7 +1964,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
- final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
leftMockBatch_1, rightMockBatch_1);
@@ -2091,7 +2091,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
- final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
leftMockBatch_1, rightMockBatch_1);
@@ -2225,7 +2225,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
- final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
leftMockBatch_1, rightMockBatch_1);
@@ -2369,7 +2369,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
- final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
leftMockBatch_1, rightMockBatch_1);
@@ -2723,7 +2723,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
- final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+ final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
leftMockBatch_1, rightMockBatch_1);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index f281964..03fd1c1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -69,7 +69,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
@BeforeClass public static void setUpBeforeClass() throws Exception {
mockPopConfig = new MockStorePOP(null);
- ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.FULL);
+ ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER);
operatorContext = fixture.newOperatorContext(mockPopConfig);
}
@@ -875,8 +875,8 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
final ProjectRecordBatch projectBatch2 =
new ProjectRecordBatch(projectPopConfig2, unnestBatch2, fixture.getFragmentContext());
- final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1, projectPopConfig2, JoinRelType.FULL);
- final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig, ljPopConfig2, JoinRelType.FULL);
+ final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1, projectPopConfig2, JoinRelType.INNER);
+ final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig, ljPopConfig2, JoinRelType.INNER);
final LateralJoinBatch lateralJoinBatch2 =
new LateralJoinBatch(ljPopConfig2, fixture.getFragmentContext(), projectBatch1, projectBatch2);