You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/08/02 20:26:02 UTC
[drill] 02/05: DRILL-6635: PartitionLimit for Lateral/Unnest
PartitionLimitBatch initial implementation Add unit tests for
PartitionLimitBatch
This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit f8593997ec2ab5da96906e0e26df04e6ef73776e
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Thu Jul 26 11:47:33 2018 -0700
DRILL-6635: PartitionLimit for Lateral/Unnest
PartitionLimitBatch initial implementation
Add unit tests for PartitionLimitBatch
---
.../drill/exec/physical/config/PartitionLimit.java | 62 ++
.../exec/physical/impl/limit/LimitRecordBatch.java | 18 +-
.../impl/limit/PartitionLimitBatchCreator.java | 36 +
...rdBatch.java => PartitionLimitRecordBatch.java} | 193 ++--
.../physical/impl/BaseTestOpBatchEmitOutcome.java | 5 +
.../drill/exec/physical/impl/MockRecordBatch.java | 36 +-
.../PartitionLimit/TestPartitionLimitBatch.java | 1022 ++++++++++++++++++++
7 files changed, 1272 insertions(+), 100 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
new file mode 100644
index 0000000..29f8bb2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+
+@JsonTypeName("partition-limit")
+public class PartitionLimit extends Limit {
+ private final String partitionColumn;
+
+ @JsonCreator
+ public PartitionLimit(@JsonProperty("child") PhysicalOperator child, @JsonProperty("first") Integer first,
+ @JsonProperty("last") Integer last, @JsonProperty("partitionColumn") String partitionColumn) {
+ super(child, first, last);
+ this.partitionColumn = partitionColumn;
+ }
+
+ public String getPartitionColumn() {
+ return partitionColumn;
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new PartitionLimit(child, getFirst(), getLast(), getPartitionColumn());
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitLimit(this, value);
+ }
+
+ @Override
+ public SelectionVectorMode getSVMode() {
+ return SelectionVectorMode.TWO_BYTE;
+ }
+
+ @Override
+ public int getOperatorType() {
+ return CoreOperatorType.PARTITION_LIMIT_VALUE;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index d28fd47..06f0fdb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -71,6 +71,10 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
for (VectorWrapper<?> wrapper : incoming) {
wrapper.getValueVector().clear();
}
+ // clear memory for incoming sv (if any)
+ if (incomingSv != null) {
+ incomingSv.clear();
+ }
upStream = next(incoming);
if (upStream == IterOutcome.OUT_OF_MEMORY) {
return upStream;
@@ -82,6 +86,12 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
for (VectorWrapper<?> wrapper : incoming) {
wrapper.getValueVector().clear();
}
+
+ // clear memory for incoming sv (if any)
+ if (incomingSv != null) {
+ incomingSv.clear();
+ }
+
refreshLimitState();
return upStream;
}
@@ -109,7 +119,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
@Override
protected boolean setupNewSchema() throws SchemaChangeException {
- container.zeroVectors();
+ container.clear();
transfers.clear();
for(final VectorWrapper<?> v : incoming) {
@@ -181,6 +191,12 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
outgoingSv.allocateNew(inputRecordCount);
limit(inputRecordCount);
}
+
+ // clear memory for incoming sv (if any)
+ if (incomingSv != null) {
+ incomingSv.clear();
+ }
+
return getFinalOutcome(false);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java
new file mode 100644
index 0000000..9c7ebd2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitBatchCreator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.config.PartitionLimit;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class PartitionLimitBatchCreator implements BatchCreator<PartitionLimit> {
+ @Override
+ public PartitionLimitRecordBatch getBatch(ExecutorFragmentContext context, PartitionLimit config,
+ List<RecordBatch> children)
+ throws ExecutionSetupException {
+ return new PartitionLimitRecordBatch(config, context, Iterables.getOnlyElement(children));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
similarity index 55%
copy from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
copy to exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
index d28fd47..0409980 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java
@@ -17,26 +17,29 @@
*/
package org.apache.drill.exec.physical.impl.limit;
-import java.util.List;
-
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.config.PartitionLimit;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.IntVector;
-import com.google.common.collect.Lists;
+import java.util.List;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
-import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
-public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
+/**
+ * Helps to perform limit in a partition within a record batch. Currently only integer type of partition column is
+ * supported. This is mainly used for Lateral/Unnest subquery where each output batch from Unnest will contain an
+ * implicit column for rowId for each row.
+ */
+public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<PartitionLimit> {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
private SelectionVector2 outgoingSv;
@@ -45,10 +48,14 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
// Start offset of the records
private int recordStartOffset;
private int numberOfRecords;
- private boolean first = true;
private final List<TransferPair> transfers = Lists.newArrayList();
- public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming)
+ // Partition RowId which is currently being processed, this will help to handle cases when rows for a partition id
+ // flows across 2 batches
+ private int partitionId;
+ private IntVector partitionColumn;
+
+ public PartitionLimitRecordBatch(PartitionLimit popConfig, FragmentContext context, RecordBatch incoming)
throws OutOfMemoryException {
super(popConfig, context, incoming);
outgoingSv = new SelectionVector2(oContext.getAllocator());
@@ -56,42 +63,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
}
@Override
- public IterOutcome innerNext() {
- if (!first && !needMoreRecords(numberOfRecords)) {
- outgoingSv.setRecordCount(0);
- incoming.kill(true);
-
- IterOutcome upStream = next(incoming);
- if (upStream == IterOutcome.OUT_OF_MEMORY) {
- return upStream;
- }
-
- while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
- // Clear the memory for the incoming batch
- for (VectorWrapper<?> wrapper : incoming) {
- wrapper.getValueVector().clear();
- }
- upStream = next(incoming);
- if (upStream == IterOutcome.OUT_OF_MEMORY) {
- return upStream;
- }
- }
- // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
- if (upStream == EMIT) {
- // Clear the memory for the incoming batch
- for (VectorWrapper<?> wrapper : incoming) {
- wrapper.getValueVector().clear();
- }
- refreshLimitState();
- return upStream;
- }
- // other leaf operator behave as before.
- return NONE;
- }
- return super.innerNext();
- }
-
- @Override
public SelectionVector2 getSelectionVector2() {
return outgoingSv;
}
@@ -104,18 +75,26 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
@Override
public void close() {
outgoingSv.clear();
+ transfers.clear();
super.close();
}
@Override
protected boolean setupNewSchema() throws SchemaChangeException {
- container.zeroVectors();
+ container.clear();
transfers.clear();
for(final VectorWrapper<?> v : incoming) {
final TransferPair pair = v.getValueVector().makeTransferPair(
container.addOrGet(v.getField(), callBack));
transfers.add(pair);
+
+ // Hold the transfer pair target vector for partitionColumn, since before applying limit it transfer all rows
+ // from incoming to outgoing batch
+ String fieldName = v.getField().getName();
+ if (fieldName.equals(popConfig.getPartitionColumn())) {
+ partitionColumn = (IntVector) pair.getTo();
+ }
}
final BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
@@ -158,28 +137,31 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
@Override
protected IterOutcome doWork() {
- if (first) {
- first = false;
- }
final int inputRecordCount = incoming.getRecordCount();
if (inputRecordCount == 0) {
setOutgoingRecordCount(0);
+ for (VectorWrapper vw : incoming) {
+ vw.clear();
+ }
+ // Release buffer for sv2 (if any)
+ if (incomingSv != null) {
+ incomingSv.clear();
+ }
return getFinalOutcome(false);
}
- for(final TransferPair tp : transfers) {
+ for (final TransferPair tp : transfers) {
tp.transfer();
}
- // Check if current input record count is less than start offset. If yes then adjust the start offset since we
- // have to ignore all these records and return empty batch.
- if (inputRecordCount <= recordStartOffset) {
- recordStartOffset -= inputRecordCount;
- setOutgoingRecordCount(0);
- } else {
- // Allocate SV2 vectors for the record count size since we transfer all the vectors buffer from input record
- // batch to output record batch and later an SV2Remover copies the needed records.
- outgoingSv.allocateNew(inputRecordCount);
- limit(inputRecordCount);
+
+ // Allocate SV2 vectors for the record count size since we transfer all the vectors buffer from input record
+ // batch to output record batch and later an SV2Remover copies the needed records.
+ outgoingSv.allocateNew(inputRecordCount);
+ limit(inputRecordCount);
+
+ // Release memory for incoming sv (if any)
+ if (incomingSv != null) {
+ incomingSv.clear();
}
return getFinalOutcome(false);
}
@@ -191,62 +173,81 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
* @param inputRecordCount - number of records in incoming batch
*/
private void limit(int inputRecordCount) {
- int endRecordIndex;
+ boolean outputAllRecords = (numberOfRecords == Integer.MIN_VALUE);
+
+ int svIndex = 0;
+ // If partitionId is not -1 that means it's set to previous batch last partitionId
+ partitionId = (partitionId == -1) ? getCurrentRowId(0) : partitionId;
+
+ for (int i=0; i < inputRecordCount;) {
+ // Get rowId from current right row
+ int currentRowId = getCurrentRowId(i);
+
+ if (partitionId == currentRowId) {
+ // Check if there is any start offset set for each partition and skip those records
+ if (recordStartOffset > 0) {
+ --recordStartOffset;
+ ++i;
+ continue;
+ }
+
+ // Once the start offset records are skipped then consider rows until numberOfRecords is reached for that
+ // partition
+ if (outputAllRecords) {
+ updateOutputSV2(svIndex++, i);
+ } else if (numberOfRecords > 0) {
+ updateOutputSV2(svIndex++, i);
+ --numberOfRecords;
+ }
+ ++i;
+ } else { // now a new row with different partition id is found
+ refreshConfigParameter();
+ partitionId = currentRowId;
+ }
+ }
+
+ setOutgoingRecordCount(svIndex);
+ }
- if (numberOfRecords == Integer.MIN_VALUE) {
- endRecordIndex = inputRecordCount;
+ private void updateOutputSV2(int svIndex, int incomingIndex) {
+ if (incomingSv != null) {
+ outgoingSv.setIndex(svIndex, incomingSv.getIndex(incomingIndex));
} else {
- endRecordIndex = Math.min(inputRecordCount, recordStartOffset + numberOfRecords);
- numberOfRecords -= Math.max(0, endRecordIndex - recordStartOffset);
+ outgoingSv.setIndex(svIndex, (char) incomingIndex);
}
+ }
- int svIndex = 0;
- for(int i = recordStartOffset; i < endRecordIndex; svIndex++, i++) {
- if (incomingSv != null) {
- outgoingSv.setIndex(svIndex, incomingSv.getIndex(i));
- } else {
- outgoingSv.setIndex(svIndex, (char) i);
- }
+ private int getCurrentRowId(int incomingIndex) {
+ if (incomingSv != null) {
+ return partitionColumn.getAccessor().get(incomingSv.getIndex(incomingIndex));
+ } else {
+ return partitionColumn.getAccessor().get(incomingIndex);
}
- outgoingSv.setRecordCount(svIndex);
- // Update the start offset
- recordStartOffset = 0;
}
private void setOutgoingRecordCount(int outputCount) {
outgoingSv.setRecordCount(outputCount);
+ container.setRecordCount(outputCount);
}
/**
- * Method which returns if more output records are needed from LIMIT operator. When numberOfRecords is set to
- * {@link Integer#MIN_VALUE} that means there is no end bound on LIMIT, so get all the records past start offset.
- * @return - true - more output records is expected.
- * false - limit bound is reached and no more record is expected
+ * Reset the states for recordStartOffset, numberOfRecords and based on the {@link PartitionLimit} passed to the
+ * operator. It also resets the partitionId since after EMIT outcome there will be new partitionId to consider.
+ * This method is called for the each EMIT outcome received no matter if limit is reached or not.
*/
- private boolean needMoreRecords(int recordsToRead) {
- boolean readMore = true;
-
- Preconditions.checkState(recordsToRead == Integer.MIN_VALUE || recordsToRead >= 0,
- String.format("Invalid value of numberOfRecords %d inside LimitRecordBatch", recordsToRead));
-
- // Above check makes sure that either numberOfRecords has no bound or if it has bounds then either we have read
- // all the records or still left to read some.
- // Below check just verifies if there is bound on numberOfRecords and we have read all of it.
- if (recordsToRead == 0) {
- readMore = false;
- }
- return readMore;
+ private void refreshLimitState() {
+ refreshConfigParameter();
+ partitionId = -1;
}
/**
- * Reset the states for recordStartOffset and numberOfRecords based on the popConfig passed to the operator.
- * This method is called for the outcome EMIT no matter if limit is reached or not.
+ * Only resets the recordStartOffset and numberOfRecord based on {@link PartitionLimit} passed to the operator. It
+ * is explicitly called after the limit for each partitionId is met or partitionId changes within an EMIT boundary.
*/
- private void refreshLimitState() {
+ private void refreshConfigParameter() {
// Make sure startOffset is non-negative
recordStartOffset = Math.max(0, popConfig.getFirst());
numberOfRecords = (popConfig.getLast() == null) ?
Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset;
- first = true;
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
index cd24640..4eaca2b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/BaseTestOpBatchEmitOutcome.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.store.mock.MockStorePOP;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.schema.SchemaBuilder;
@@ -48,6 +49,9 @@ public class BaseTestOpBatchEmitOutcome extends PhysicalOpUnitTestBase {
// List of incoming containers
protected final List<VectorContainer> inputContainer = new ArrayList<>(5);
+ // List of SV2's
+ protected final List<SelectionVector2> inputContainerSv2 = new ArrayList<>(5);
+
// List of incoming IterOutcomes
protected final List<RecordBatch.IterOutcome> inputOutcomes = new ArrayList<>(5);
@@ -79,6 +83,7 @@ public class BaseTestOpBatchEmitOutcome extends PhysicalOpUnitTestBase {
nonEmptyInputRowSet.clear();
inputContainer.clear();
inputOutcomes.clear();
+ inputContainerSv2.clear();
outputRecordCount = 0;
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 0c43ab2..ed7af4c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -38,6 +38,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
// These resources are owned by this RecordBatch
protected VectorContainer container;
+ protected SelectionVector2 sv2;
private int currentContainerIndex;
private int currentOutcomeIndex;
private boolean isDone;
@@ -45,6 +46,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
// All the below resources are owned by caller
private final List<VectorContainer> allTestContainers;
+ private List<SelectionVector2> allTestContainersSv2;
private final List<IterOutcome> allOutcomes;
private final FragmentContext context;
protected final OperatorContext oContext;
@@ -62,19 +64,32 @@ public class MockRecordBatch implements CloseableRecordBatch {
this.currentContainerIndex = 0;
this.currentOutcomeIndex = 0;
this.isDone = false;
+ this.allTestContainersSv2 = null;
+ this.sv2 = null;
+ }
+
+ public MockRecordBatch(FragmentContext context, OperatorContext oContext,
+ List<VectorContainer> testContainers, List<IterOutcome> iterOutcomes,
+ List<SelectionVector2> testContainersSv2, BatchSchema schema) {
+ this(context, oContext, testContainers, iterOutcomes, schema);
+ allTestContainersSv2 = testContainersSv2;
+ sv2 = (allTestContainersSv2 != null && allTestContainersSv2.size() > 0) ? new SelectionVector2(allocator) : null;
}
@Override
- public void close() throws Exception {
+ public void close() {
container.clear();
container.setRecordCount(0);
currentContainerIndex = 0;
currentOutcomeIndex = 0;
+ if (sv2 != null) {
+ sv2.clear();
+ }
}
@Override
public SelectionVector2 getSelectionVector2() {
- return null;
+ return sv2;
}
@Override
@@ -94,7 +109,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
@Override
public int getRecordCount() {
- return container.getRecordCount();
+ return (sv2 == null) ? container.getRecordCount() : sv2.getCount();
}
@Override
@@ -103,6 +118,9 @@ public class MockRecordBatch implements CloseableRecordBatch {
isDone = true;
container.clear();
container.setRecordCount(0);
+ if (sv2 != null) {
+ sv2.clear();
+ }
}
}
@@ -142,6 +160,18 @@ public class MockRecordBatch implements CloseableRecordBatch {
}
container.transferIn(input);
container.setRecordCount(recordCount);
+
+ // Transfer the sv2 as well
+ final SelectionVector2 inputSv2 =
+ (allTestContainersSv2 != null && allTestContainersSv2.size() > 0)
+ ? allTestContainersSv2.get(currentContainerIndex) : null;
+ if (inputSv2 != null) {
+ sv2.allocateNewSafe(inputSv2.getCount());
+ for (int i=0; i<inputSv2.getCount(); ++i) {
+ sv2.setIndex(i, inputSv2.getIndex(i));
+ }
+ sv2.setRecordCount(inputSv2.getCount());
+ }
}
if (currentOutcomeIndex < allOutcomes.size()) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java
new file mode 100644
index 0000000..574ff76
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/PartitionLimit/TestPartitionLimitBatch.java
@@ -0,0 +1,1022 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.PartitionLimit;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.exec.physical.config.PartitionLimit;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.test.rowSet.IndirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@Category(OperatorTest.class)
+public class TestPartitionLimitBatch extends BaseTestOpBatchEmitOutcome {
+
+ private static String PARTITION_COLUMN;
+
+ // Holds reference to actual operator instance created for each tests
+ private static PartitionLimitRecordBatch limitBatch;
+
+ // Lits of expected outcomes populated by each tests. Used to verify actual IterOutcome returned with next call on
+ // operator to expected outcome
+ private final List<RecordBatch.IterOutcome> expectedOutcomes = new ArrayList<>();
+
+ // List of expected row counts populated by each tests. Used to verify actual output row count to expected row count
+ private final List<Integer> expectedRecordCounts = new ArrayList<>();
+
+ // List of expected row sets populated by each tests. Used to verify actual output from operator to expected output
+ private final List<RowSet> expectedRowSets = new ArrayList<>();
+
+ @BeforeClass
+ public static void partitionLimitSetup() {
+ PARTITION_COLUMN = inputSchema.column(0).getName();
+ }
+
+ /**
+ * Cleanup method executed post each test
+ */
+ @After
+ public void afterTestCleanup() {
+ // close limitBatch
+ limitBatch.close();
+
+ // Release memory from expectedRowSets
+ for (RowSet expectedRowSet : expectedRowSets) {
+ expectedRowSet.clear();
+ }
+ expectedOutcomes.clear();
+ expectedRecordCounts.clear();
+ expectedRowSets.clear();
+ }
+
+ /**
+ * Common method used by all the tests for {@link PartitionLimitRecordBatch} below. It creates the MockRecordBatch
+ * and {@link PartitionLimitRecordBatch} with the populated containers and outcomes list in the test. It also
+ * verifies the expected outcomes list and record count populated by each test against each next() call to
+ * {@link PartitionLimitRecordBatch}. For cases when the expected record count is >0 it verifies the actual output
+ * returned by {@link PartitionLimitRecordBatch} with expected output rows.
+ * @param start - Start offset for {@link PartitionLimit} PopConfig
+ * @param end - End offset for {@link PartitionLimit} PopConfig
+ */
+ private void testPartitionLimitCommon(Integer start, Integer end) {
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, inputContainerSv2, inputContainer.get(0).getSchema());
+
+ final PartitionLimit limitConf = new PartitionLimit(null, start, end, PARTITION_COLUMN);
+ limitBatch = new PartitionLimitRecordBatch(limitConf, operatorFixture.getFragmentContext(), mockInputBatch);
+
+ int i=0;
+ int expectedRowSetIndex = 0;
+ while (i < expectedOutcomes.size()) {
+ try {
+ assertTrue(expectedOutcomes.get(i) == limitBatch.next());
+ assertTrue(expectedRecordCounts.get(i++) == limitBatch.getRecordCount());
+
+ if (limitBatch.getRecordCount() > 0) {
+ final RowSet actualRowSet = IndirectRowSet.fromSv2(limitBatch.getContainer(),
+ limitBatch.getSelectionVector2());
+ new RowSetComparison(expectedRowSets.get(expectedRowSetIndex++)).verify(actualRowSet);
+ }
+ } finally {
+ limitBatch.getSelectionVector2().clear();
+ limitBatch.getContainer().zeroVectors();
+ }
+ }
+ }
+
+ /**
+ * Verifies that empty batch with both OK_NEW_SCHEMA and EMIT outcome is not ignored by
+ * {@link PartitionLimitRecordBatch} and is passed to the downstream operator.
+ */
+ @Test
+ public void testPartitionLimit_EmptyBatchEmitOutcome() {
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(0);
+
+ testPartitionLimitCommon(0, 1);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} considers all the batch until it sees EMIT outcome and return output
+ * batch with data that meets the {@link PartitionLimitRecordBatch} criteria.
+ */
+ @Test
+ public void testPartitionLimit_NonEmptyBatchEmitOutcome() {
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(1);
+
+ RowSet expectedBatch = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+ expectedRowSets.add(expectedBatch);
+
+ testPartitionLimitCommon(0, 1);
+ }
+
+ /**
+ * Verifies that {@link PartitionLimitRecordBatch} batch operates on batches across EMIT boundary with fresh
+ * configuration. That is it considers partition column data separately for batches across EMIT boundary.
+ */
+ @Test
+ public void testPartitionLimit_ResetsAfterFirstEmitOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(2, 200, "item200")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(2, 200, "item200")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+ expectedRecordCounts.add(1);
+ expectedRecordCounts.add(0);
+ // Since in this input batch there is 2 different partitionId
+ expectedRecordCounts.add(2);
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+ expectedRowSets.add(expectedRowSet2);
+
+ testPartitionLimitCommon(0, 1);
+ }
+
+ /**
+ * Verifies that when the {@link PartitionLimitRecordBatch} number of records is found with first incoming batch,
+ * then next empty incoming batch with OK outcome is ignored, but the empty EMIT outcome batch is not ignored.
+ * Empty incoming batch with EMIT outcome produces empty output batch with EMIT outcome.
+ */
+ @Test
+ public void testPartitionLimit_NonEmptyFirst_EmptyOKEmitOutcome() {
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+ expectedRecordCounts.add(1);
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(0);
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+ expectedRowSets.add(expectedRowSet1);
+
+ testPartitionLimitCommon(0, 1);
+ }
+
+ /**
+ * Verifies that {@link PartitionLimitRecordBatch} refreshes it's state after seeing first EMIT outcome and works on
+ * data batches following it as new set's of incoming batch and apply the partition limit rule from fresh on those.
+ * So for first set of batches with OK_NEW_SCHEMA and EMIT outcome the total number of records received being less
+ * than limit condition, it still produces an output with that many records for each partition key (in this case 1
+ * even though limit number of records is 2).
+ *
+ * After seeing EMIT, it refreshes it's state and operate on next input batches to again return limit number of
+ * records per partition id. So for 3rd batch with 6 records and 3 partition id and with EMIT outcome it produces an
+ * output batch with <=2 records for each partition id.
+ */
+ @Test
+ public void testPartitionLimit_AcrossEmitOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(2, 200, "item200")
+ .addRow(3, 300, "item300")
+ .addRow(3, 301, "item301")
+ .build();
+
+ final RowSet expectedRows1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ final RowSet.SingleRowSet expectedRows2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(2, 200, "item200")
+ .addRow(3, 300, "item300")
+ .addRow(3, 301, "item301")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+ expectedRecordCounts.add(expectedRows1.rowCount());
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(expectedRows2.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRows1);
+ expectedRowSets.add(expectedRows2);
+
+ testPartitionLimitCommon(0, 2);
+ }
+
+ /**
+ * Verifies that {@link PartitionLimitRecordBatch} considers same partition id across batches but within EMIT
+ * boundary to impose limit condition.
+ */
+ @Test
+ public void testPartitionLimit_PartitionIdSpanningAcrossBatches() {
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 200, "item200")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(1);
+ expectedRecordCounts.add(1);
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+ expectedRowSets.add(expectedRowSet2);
+
+ testPartitionLimitCommon(0 ,1);
+ }
+
+ @Test
+ public void testPartitionLimit_PartitionIdSpanningAcrossBatches_WithOffset() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 101, "item101")
+ .addRow(2, 202, "item202")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(2);
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+
+ testPartitionLimitCommon(2 ,3);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} works correctly in cases a partition id spans across batches and
+ * limit condition is met by picking records from multiple batch for same partition id.
+ */
+ @Test
+ public void testPartitionLimit_PartitionIdSelectedAcrossBatches() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(expectedRowSet1.rowCount());
+ expectedRecordCounts.add(expectedRowSet2.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+ expectedRowSets.add(expectedRowSet2);
+
+ testPartitionLimitCommon(0 ,5);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} works correctly in cases where start offset is such that all the
+ * records of a partition id is ignored but records in other partition id is selected.
+ */
+ @Test
+ public void testPartitionLimit_IgnoreOnePartitionIdWithOffset() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(expectedRowSet1.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+
+ testPartitionLimitCommon(3, 5);
+ }
+
+ @Test
+ public void testPartitionLimit_LargeOffsetIgnoreAllRecords() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(0);
+
+ testPartitionLimitCommon(5, 6);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} works correctly when start and end offset is same. In this case it
+ * works as Limit 0 scenario where it will not output any rows for any partition id across batches.
+ */
+ @Test
+ public void testPartitionLimit_Limit0() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(0);
+
+ testPartitionLimitCommon(0, 0);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} works correctly for cases where no end offset is mentioned. This
+ * necessary means selecting all the records in a partition.
+ */
+ @Test
+ public void testPartitionLimit_NoLimit() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(expectedRowSet1.rowCount());
+ expectedRecordCounts.add(expectedRowSet2.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+ expectedRowSets.add(expectedRowSet2);
+
+ testPartitionLimitCommon(0, null);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} takes care of provided negative start offset correctly
+ */
+ @Test
+ public void testPartitionLimit_NegativeOffset() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // second OK batch is consumed by abstractRecordBatch since it's empty
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(expectedRowSet1.rowCount());
+ expectedRecordCounts.add(expectedRowSet2.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+ expectedRowSets.add(expectedRowSet2);
+
+ testPartitionLimitCommon(-5, 2);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT boundary with single or multiple
+ * batches within each EMIT boundary. It resets it states correctly across EMIT boundary and then operates on all
+ * the batches within EMIT boundary at a time.
+ */
+ @Test
+ public void testPartitionLimit_MultipleEmit_SingleMultipleBatch() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ // Second EMIT boundary batches
+ final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 1001, "item1001")
+ .addRow(1, 1002, "item1002")
+ .addRow(1, 1003, "item1003")
+ .addRow(2, 2000, "item2000")
+ .addRow(2, 2001, "item2001")
+ .addRow(2, 2002, "item2002")
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(3, 3001, "item3001")
+ .addRow(3, 3002, "item3002")
+ .addRow(3, 3003, "item3003")
+ .addRow(4, 4000, "item4000")
+ .addRow(4, 4001, "item4001")
+ .build();
+
+ // Third EMIT boundary batches
+ final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10001, "item10001")
+ .addRow(1, 10002, "item10002")
+ .addRow(1, 10003, "item10003")
+ .build();
+
+ // First EMIT boundary expected rowsets
+ final RowSet expectedRowSet1 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10, "item1")
+ .build();
+
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .build();
+
+ // Second EMIT boundary expected rowsets
+ final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 1001, "item1001")
+ .addRow(1, 1002, "item1002")
+ .addRow(2, 2000, "item2000")
+ .addRow(2, 2001, "item2001")
+ .build();
+
+ final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(3, 3001, "item3001")
+ .addRow(3, 3002, "item3002")
+ .addRow(4, 4000, "item4000")
+ .addRow(4, 4001, "item4001")
+ .build();
+
+ // Third EMIT boundary expected rowsets
+ final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10001, "item10001")
+ .addRow(1, 10002, "item10002")
+ .build();
+
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(nonEmptyInputRowSet3.container());
+ inputContainer.add(nonEmptyInputRowSet4.container());
+ inputContainer.add(nonEmptyInputRowSet5.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(expectedRowSet1.rowCount());
+ expectedRecordCounts.add(expectedRowSet2.rowCount());
+ expectedRecordCounts.add(expectedRowSet3.rowCount());
+ expectedRecordCounts.add(expectedRowSet4.rowCount());
+ expectedRecordCounts.add(expectedRowSet5.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet1);
+ expectedRowSets.add(expectedRowSet2);
+ expectedRowSets.add(expectedRowSet3);
+ expectedRowSets.add(expectedRowSet4);
+ expectedRowSets.add(expectedRowSet5);
+
+ testPartitionLimitCommon(-5, 2);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT boundary with single or multiple
+ * batches (with sv2) within each EMIT boundary. It resets it states correctly across EMIT boundary and then
+ * operates on all the batches within EMIT boundary at a time.
+ */
+ @Test
+ public void testPartitionLimit_MultipleEmit_SingleMultipleBatch_WithSV2() {
+ final RowSet.SingleRowSet emptyWithSv2 = operatorFixture.rowSetBuilder(inputSchema)
+ .withSv2()
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(1, 102, "item102")
+ .addRow(1, 103, "item103")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .withSv2()
+ .build();
+
+ // Second EMIT boundary batches
+ final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 1001, "item1001")
+ .addRow(1, 1002, "item1002")
+ .addRow(1, 1003, "item1003")
+ .addRow(2, 2000, "item2000")
+ .addRow(2, 2001, "item2001")
+ .addRow(2, 2002, "item2002")
+ .withSv2()
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(3, 3001, "item3001")
+ .addRow(3, 3002, "item3002")
+ .addRow(3, 3003, "item3003")
+ .addRow(4, 4000, "item4000")
+ .addRow(4, 4001, "item4001")
+ .withSv2()
+ .build();
+
+ // Third EMIT boundary batches
+ final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10001, "item10001")
+ .addRow(1, 10002, "item10002")
+ .addRow(1, 10003, "item10003")
+ .withSv2()
+ .build();
+
+ // First EMIT boundary expected row sets
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 100, "item100")
+ .addRow(1, 101, "item101")
+ .addRow(2, 200, "item200")
+ .addRow(2, 201, "item201")
+ .build();
+
+ // Second EMIT boundary expected row sets
+ final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 1001, "item1001")
+ .addRow(1, 1002, "item1002")
+ .addRow(2, 2000, "item2000")
+ .addRow(2, 2001, "item2001")
+ .build();
+
+ final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(3, 3001, "item3001")
+ .addRow(3, 3002, "item3002")
+ .addRow(4, 4000, "item4000")
+ .addRow(4, 4001, "item4001")
+ .build();
+
+ // Third EMIT boundary expected row sets
+ final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10001, "item10001")
+ .addRow(1, 10002, "item10002")
+ .build();
+
+ inputContainer.add(emptyWithSv2.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(nonEmptyInputRowSet3.container());
+ inputContainer.add(nonEmptyInputRowSet4.container());
+ inputContainer.add(nonEmptyInputRowSet5.container());
+ inputContainer.add(emptyWithSv2.container());
+
+ inputContainerSv2.add(emptyWithSv2.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet3.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet4.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet5.getSv2());
+ inputContainerSv2.add(emptyWithSv2.getSv2());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(expectedRowSet2.rowCount());
+ expectedRecordCounts.add(expectedRowSet3.rowCount());
+ expectedRecordCounts.add(expectedRowSet4.rowCount());
+ expectedRecordCounts.add(expectedRowSet5.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet2);
+ expectedRowSets.add(expectedRowSet3);
+ expectedRowSets.add(expectedRowSet4);
+ expectedRowSets.add(expectedRowSet5);
+
+ testPartitionLimitCommon(-5, 2);
+ }
+
+ /**
+ * Verifies {@link PartitionLimitRecordBatch} behaves correctly across EMIT boundary with single or multiple
+ * batches (with sv2) within each EMIT boundary. It resets it states correctly across EMIT boundary and then
+ * operates on all the batches within EMIT boundary at a time.
+ */
+ @Test
+ public void testPartitionLimit_MultipleEmit_SingleMultipleBatch_WithSV2_FilteredRows() {
+ final RowSet.SingleRowSet emptyWithSv2 = operatorFixture.rowSetBuilder(inputSchema)
+ .withSv2()
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addSelection(false, 1, 100, "item100")
+ .addSelection(true, 1, 101, "item101")
+ .addSelection(false, 1, 102, "item102")
+ .addSelection(true, 1, 103, "item103")
+ .addSelection(false, 2, 200, "item200")
+ .addSelection(true, 2, 201, "item201")
+ .addSelection(true, 2, 202, "item202")
+ .withSv2()
+ .build();
+
+ // Second EMIT boundary batches
+ final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+ .addSelection(false, 1, 1001, "item1001")
+ .addSelection(true, 1, 1002, "item1002")
+ .addSelection(true, 1, 1003, "item1003")
+ .addSelection(true, 2, 2000, "item2000")
+ .addSelection(false, 2, 2001, "item2001")
+ .addSelection(true, 2, 2002, "item2002")
+ .withSv2()
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+ .addSelection(true, 3, 3001, "item3001")
+ .addSelection(false, 3, 3002, "item3002")
+ .addSelection(true, 3, 3003, "item3003")
+ .addSelection(true, 4, 4000, "item4000")
+ .addSelection(true, 4, 4001, "item4001")
+ .withSv2()
+ .build();
+
+ // Third EMIT boundary batches
+ final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+ .addSelection(true, 1, 10001, "item10001")
+ .addSelection(true, 1, 10002, "item10002")
+ .addSelection(false, 1, 10003, "item10003")
+ .withSv2()
+ .build();
+
+ // First EMIT boundary expected row sets
+ final RowSet expectedRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 101, "item101")
+ .addRow(1, 103, "item103")
+ .addRow(2, 201, "item201")
+ .addRow(2, 202, "item202")
+ .build();
+
+ // Second EMIT boundary expected row sets
+ final RowSet expectedRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 1002, "item1002")
+ .addRow(1, 1003, "item1003")
+ .addRow(2, 2000, "item2000")
+ .addRow(2, 2002, "item2002")
+ .build();
+
+ final RowSet expectedRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(3, 3001, "item3001")
+ .addRow(3, 3003, "item3003")
+ .addRow(4, 4000, "item4000")
+ .addRow(4, 4001, "item4001")
+ .build();
+
+ // Third EMIT boundary expected row sets
+ final RowSet expectedRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(1, 10001, "item10001")
+ .addRow(1, 10002, "item10002")
+ .build();
+
+ inputContainer.add(emptyWithSv2.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(nonEmptyInputRowSet3.container());
+ inputContainer.add(nonEmptyInputRowSet4.container());
+ inputContainer.add(nonEmptyInputRowSet5.container());
+ inputContainer.add(emptyWithSv2.container());
+
+ inputContainerSv2.add(emptyWithSv2.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet3.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet4.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet5.getSv2());
+ inputContainerSv2.add(emptyWithSv2.getSv2());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.OK);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ expectedOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ expectedRecordCounts.add(0);
+ expectedRecordCounts.add(expectedRowSet2.rowCount());
+ expectedRecordCounts.add(expectedRowSet3.rowCount());
+ expectedRecordCounts.add(expectedRowSet4.rowCount());
+ expectedRecordCounts.add(expectedRowSet5.rowCount());
+ expectedRecordCounts.add(0);
+
+ expectedRowSets.add(expectedRowSet2);
+ expectedRowSets.add(expectedRowSet3);
+ expectedRowSets.add(expectedRowSet4);
+ expectedRowSets.add(expectedRowSet5);
+
+ testPartitionLimitCommon(-5, 2);
+ }
+}