You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2022/01/29 07:15:58 UTC

[GitHub] [drill] paul-rogers opened a new pull request #2441: Adds LIMIT pushdown support in the scan framework

paul-rogers opened a new pull request #2441:
URL: https://github.com/apache/drill/pull/2441


   # [DRILL-8115](https://issues.apache.org/jira/browse/DRILL-8115): PR Title
   
   ## Description
   
   Subset of the [DRILL-8085 PR](https://github.com/apache/drill/pull/2419) which includes only the LIMIT pushdown support in EVF, but no integration of that support with any reader.
   
   Modifies EVF and the scan framework to enforce LIMIT pushdown across readers. Adds tests to demonstrate that the limit is applied correctly.
   
   ## Documentation
   
   N/A
   
   ## Testing
   
   Added unit tests. Ran Github-based tests since the Drill tests no longer run on my machine, and I can't change my machine's time zone to UTC so that the will run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] paul-rogers commented on a change in pull request #2441: DRILL-8115: Adds LIMIT pushdown support in the scan framework

Posted by GitBox <gi...@apache.org>.
paul-rogers commented on a change in pull request #2441:
URL: https://github.com/apache/drill/pull/2441#discussion_r795373443



##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
##########
@@ -282,8 +302,8 @@ private void endBatch() {
     if (tableLoader.batchCount() == 1 || prevTableSchemaVersion < tableLoader.schemaVersion()) {
       reviseOutputProjection(tableLoader.outputSchema());
     }
-    buildOutputBatch(readerOutput);
-    scanLifecycle.tallyBatch();
+    int rowCount = buildOutputBatch(readerOutput);

Review comment:
       In general, I tried to use `int` for the row count *within* a batch, and `long` for the row count *across* batches. Any one batch can have no more than 64K rows. But, a result set could potentially have 100B or 1T rows total across a large number of batches.
   
   So, the limit for a scan (or for a reader) should be a `long`. Now, I'm not sure why anyone would use `LIMIT 4000000000`, but better safe than sorry.
   
   If you see the use of `long` for batch counts, or `int` for result set counts, please call it out as it is probably an error.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockBatchReader.java
##########
@@ -140,7 +133,7 @@ public boolean next() {
       writer.save();
     }
 
-    return true;
+    return !loader.atLimit();

Review comment:
       That is correct. This particular test uses a subset of EVF: just the `ResultSetLoader`, which is responsible for enforcing the per-reader limit. In the case of this mock reader for testing, there is only one reader, so enforcing the per-reader limit is equivalent to enforcing the scan limit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong merged pull request #2441: DRILL-8115: Adds LIMIT pushdown support in the scan framework

Posted by GitBox <gi...@apache.org>.
luocooong merged pull request #2441:
URL: https://github.com/apache/drill/pull/2441


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on a change in pull request #2441: DRILL-8115: Adds LIMIT pushdown support in the scan framework

Posted by GitBox <gi...@apache.org>.
luocooong commented on a change in pull request #2441:
URL: https://github.com/apache/drill/pull/2441#discussion_r795164948



##########
File path: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLimit.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V1 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ */
+public class TestScanOperExecLimit extends BaseScanOperatorExecTest {
+
+  /**
+   * Mock reader that returns two 50-row batches.
+   */
+  protected static class Mock50RowReader implements ManagedReader<SchemaNegotiator> {
+    protected boolean openCalled;
+    protected ResultSetLoader tableLoader;
+
+    @Override
+    public boolean open(SchemaNegotiator negotiator) {
+      openCalled = true;
+      negotiator.tableSchema(new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .build(), true);
+        tableLoader = negotiator.build();
+      return true;
+    }
+
+    @Override
+    public boolean next() {
+      if (tableLoader.batchCount() > 2) {
+        return false;
+      }
+      RowSetLoader rowSet = tableLoader.writer();
+      int base = tableLoader.batchCount() * 50 + 1;
+      for (int i = 0; i < 50; i++) {
+        if (rowSet.isFull()) {
+          break;
+        }
+        rowSet.addSingleCol(base + i);
+      }
+      return true;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  /**
+   * LIMIT 0, to obtain only the schema.
+   */
+  @Test
+  public void testLimit0() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(0);
+    ScanFixture scanFixture =  builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(0, batch.rowCount());
+    assertEquals(1, batch.schema().getFieldCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    scanFixture.close();
+
+    assertTrue(reader1.openCalled);
+    assertFalse(reader2.openCalled);
+  }
+
+  /**
+   * LIMIT 1, simplest case
+   */
+  @Test
+  public void testLimit1() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(1);
+    ScanFixture scanFixture =  builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(1, batch.rowCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    scanFixture.close();
+
+    assertTrue(reader1.openCalled);
+    assertFalse(reader2.openCalled);
+  }
+
+  /**
+   * LIMIT 50, same as batch size, to check boundary conditions.
+   */
+  @Test
+  public void testLimitOnBatchEnd() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(50);
+    ScanFixture scanFixture =  builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    scanFixture.close();
+
+    assertTrue(reader1.openCalled);
+    assertFalse(reader2.openCalled);
+  }
+
+  /**
+   * LIMIT 75, halfway through second batch.
+   */
+  @Test
+  public void testLimitOnScondBatch() {

Review comment:
       ```suggestion
     public void testLimitOnSecondBatch() {
   ```

##########
File path: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLimit.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V1 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ */
+public class TestScanOperExecLimit extends BaseScanOperatorExecTest {
+
+  /**
+   * Mock reader that returns two 50-row batches.
+   */
+  protected static class Mock50RowReader implements ManagedReader<SchemaNegotiator> {
+    protected boolean openCalled;
+    protected ResultSetLoader tableLoader;
+
+    @Override
+    public boolean open(SchemaNegotiator negotiator) {
+      openCalled = true;
+      negotiator.tableSchema(new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .build(), true);
+        tableLoader = negotiator.build();
+      return true;
+    }
+
+    @Override
+    public boolean next() {
+      if (tableLoader.batchCount() > 2) {
+        return false;
+      }
+      RowSetLoader rowSet = tableLoader.writer();
+      int base = tableLoader.batchCount() * 50 + 1;
+      for (int i = 0; i < 50; i++) {
+        if (rowSet.isFull()) {
+          break;
+        }
+        rowSet.addSingleCol(base + i);
+      }
+      return true;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  /**
+   * LIMIT 0, to obtain only the schema.
+   */
+  @Test
+  public void testLimit0() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(0);
+    ScanFixture scanFixture =  builder.build();

Review comment:
       The same at line 120, line 148, line 176, line 209 and line 242.

##########
File path: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLimit.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.v3;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V2 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ * <p>
+ * This test is from the outside: at the scan operator level.
+ */
+public class TestScanLimit extends BaseScanTest {
+
+  /**
+   * Mock reader that returns two 50-row batches.
+   */
+  protected static class Mock50RowReader implements ManagedReader {
+
+    private final ResultSetLoader tableLoader;
+
+    public Mock50RowReader(SchemaNegotiator negotiator) {
+      negotiator.tableSchema(new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .build());
+      tableLoader = negotiator.build();
+    }
+
+    @Override
+    public boolean next() {
+      if (tableLoader.batchCount() > 1) {
+        return false;
+      }
+      RowSetLoader rowSet = tableLoader.writer();
+      int base = tableLoader.batchCount() * 50 + 1;
+      for (int i = 0; i < 50; i++) {
+        if (rowSet.isFull()) {
+          break;
+        }
+        rowSet.addSingleCol(base + i);
+      }
+      return true;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  private static class TestFixture {
+    ObservableCreator creator1;
+    ObservableCreator creator2;
+    ScanFixture scanFixture;
+    ScanOperatorExec scan;
+
+    public TestFixture(long limit) {
+      creator1 = new ObservableCreator() {
+        @Override
+        public ManagedReader create(SchemaNegotiator negotiator) {
+          return new Mock50RowReader(negotiator);
+        }
+      };
+      creator2 = new ObservableCreator() {
+        @Override
+        public ManagedReader create(SchemaNegotiator negotiator) {
+          return new Mock50RowReader(negotiator);
+        }
+      };
+      BaseScanFixtureBuilder builder = simpleBuilder(creator1, creator2);
+      builder.builder.limit(limit);
+      scanFixture = builder.build();
+      scan = scanFixture.scanOp;
+    }
+
+    public void close() { scanFixture.close(); }
+
+    public int createCount() {
+      if (creator1.reader == null) {
+        return 0;
+      }
+      if (creator2.reader == null) {
+        return 1;
+      }
+      return 2;
+    }
+  }
+
+  /**
+   * LIMIT 0, to obtain only the schema.
+   */
+  @Test
+  public void testLimit0() {
+    TestFixture fixture = new TestFixture(0);
+    ScanOperatorExec scan = fixture.scan;
+
+    assertTrue(scan.buildSchema());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(0, batch.rowCount());
+    assertEquals(1, batch.schema().getFieldCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    fixture.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, fixture.createCount());
+  }
+
+  /**
+   * LIMIT 1, simplest case
+   */
+  @Test
+  public void testLimit1() {
+    TestFixture fixture = new TestFixture(1);
+    ScanOperatorExec scan = fixture.scan;
+
+    // Reader builds schema, and stops after one row, though the reader
+    // itself is happy to provide more.
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(1, batch.rowCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    fixture.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, fixture.createCount());
+  }
+
+  /**
+   * LIMIT 50, same as batch size, to check boundary conditions.
+   */
+  @Test
+  public void testLimitOnBatchEnd() {
+    TestFixture fixture = new TestFixture(50);
+    ScanOperatorExec scan = fixture.scan;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    fixture.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, fixture.createCount());
+  }
+
+  /**
+   * LIMIT 75, halfway through second batch.
+   */
+  @Test
+  public void testLimitOnScondBatch() {

Review comment:
       ```suggestion
     public void testLimitOnSecondBatch() {
   ```

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
##########
@@ -282,8 +302,8 @@ private void endBatch() {
     if (tableLoader.batchCount() == 1 || prevTableSchemaVersion < tableLoader.schemaVersion()) {
       reviseOutputProjection(tableLoader.outputSchema());
     }
-    buildOutputBatch(readerOutput);
-    scanLifecycle.tallyBatch();
+    int rowCount = buildOutputBatch(readerOutput);

Review comment:
       In other places, use `long` to define `rowCount`. So, this is simply the number of rows in a container, and it's not able to reach the `int` maximum, is that correct?

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockBatchReader.java
##########
@@ -140,7 +133,7 @@ public boolean next() {
       writer.save();
     }
 
-    return true;
+    return !loader.atLimit();

Review comment:
       The "EVF V2" automatically checks the limit, it that correct?
   https://github.com/apache/drill/blob/282b42256a996729d4079b52176c2ab7068d2de6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java#L251-L253

##########
File path: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleLimit.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.v3.lifecycle;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V2 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ * <p>
+ * This test is at the level of the scan framework, stripping away
+ * the outer scan operator level.
+ */
+public class TestScanLifecycleLimit extends BaseTestScanLifecycle {
+
+  /**
+   * Mock reader that returns two 50-row batches.
+   */
+  protected static class Mock50RowReader implements ManagedReader {
+
+    private final ResultSetLoader tableLoader;
+
+    public Mock50RowReader(SchemaNegotiator negotiator) {
+      negotiator.tableSchema(new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .build());
+      tableLoader = negotiator.build();
+    }
+
+    @Override
+    public boolean next() {
+      if (tableLoader.batchCount() > 1) {
+        return false;
+      }
+      RowSetLoader rowSet = tableLoader.writer();
+      int base = tableLoader.batchCount() * 50 + 1;
+      for (int i = 0; i < 50; i++) {
+        if (rowSet.isFull()) {
+          break;
+        }
+        rowSet.addSingleCol(base + i);
+      }
+      return true;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  private Pair<TwoReaderFactory, ScanLifecycle> setupScan(long limit) {
+    ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
+    builder.projection(RowSetTestUtils.projectList("a"));
+    TwoReaderFactory factory = new TwoReaderFactory() {
+      @Override
+      public ManagedReader firstReader(SchemaNegotiator negotiator) {
+        return new Mock50RowReader(negotiator);
+      }
+
+      @Override
+      public ManagedReader secondReader(SchemaNegotiator negotiator) {
+        return new Mock50RowReader(negotiator);
+      }
+    };
+    builder.readerFactory(factory);
+
+    builder.limit(limit);
+    return Pair.of(factory, buildScan(builder));
+  }
+
+  /**
+   * LIMIT 0, to obtain only the schema.
+   */
+  @Test
+  public void testLimit0() {
+    Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(0);
+    TwoReaderFactory factory = pair.getLeft();
+    ScanLifecycle scan = pair.getRight();
+
+    // Reader builds schema, but returns no data, though the reader
+    // itself is happy to provide data.
+    RowBatchReader reader = scan.nextReader();
+    assertTrue(reader.open());
+    assertTrue(reader.next());
+    RowSet result = fixture.wrap(reader.output());
+    assertEquals(0, result.rowCount());
+    assertEquals(1, result.schema().size());
+    result.clear();
+
+    // No second batch
+    assertFalse(reader.next());
+    reader.close();
+
+    // No next reader, despite there being two, since we hit the limit.
+    assertNull(scan.nextReader());
+
+    scan.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, factory.count());
+  }
+
+  /**
+   * LIMIT 1, simplest case
+   */
+  @Test
+  public void testLimit1() {
+    Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(1);
+    TwoReaderFactory factory = pair.getLeft();
+    ScanLifecycle scan = pair.getRight();
+
+    // Reader builds schema, and stops after one row, though the reader
+    // itself is happy to provide more.
+    RowBatchReader reader = scan.nextReader();
+    assertTrue(reader.open());
+    assertTrue(reader.next());
+    RowSet result = fixture.wrap(reader.output());
+    assertEquals(1, result.rowCount());
+    assertEquals(1, result.schema().size());
+    result.clear();
+
+    // No second batch
+    assertFalse(reader.next());
+    reader.close();
+
+    // No next reader, despite there being two, since we hit the limit.
+    assertNull(scan.nextReader());
+
+    scan.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, factory.count());
+  }
+
+  /**
+   * LIMIT 50, same as batch size, to check boundary conditions.
+   */
+  @Test
+  public void testLimitOnBatchEnd() {
+    Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(50);
+    TwoReaderFactory factory = pair.getLeft();
+    ScanLifecycle scan = pair.getRight();
+
+    RowBatchReader reader = scan.nextReader();
+    assertTrue(reader.open());
+    assertTrue(reader.next());
+    RowSet result = fixture.wrap(reader.output());
+    assertEquals(50, result.rowCount());
+    result.clear();
+
+    // No second batch
+    assertFalse(reader.next());
+    reader.close();
+
+    // No next reader, despite there being two, since we hit the limit.
+    assertNull(scan.nextReader());
+
+    scan.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, factory.count());
+  }
+
+  /**
+   * LIMIT 75, halfway through second batch.
+   */
+  @Test
+  public void testLimitOnScondBatch() {

Review comment:
       ```suggestion
     public void testLimitOnSecondBatch() {
   ```

##########
File path: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLimit.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V1 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ */
+public class TestScanOperExecLimit extends BaseScanOperatorExecTest {
+
+  /**
+   * Mock reader that returns two 50-row batches.
+   */
+  protected static class Mock50RowReader implements ManagedReader<SchemaNegotiator> {
+    protected boolean openCalled;
+    protected ResultSetLoader tableLoader;
+
+    @Override
+    public boolean open(SchemaNegotiator negotiator) {
+      openCalled = true;
+      negotiator.tableSchema(new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .build(), true);
+        tableLoader = negotiator.build();
+      return true;
+    }
+
+    @Override
+    public boolean next() {
+      if (tableLoader.batchCount() > 2) {
+        return false;
+      }
+      RowSetLoader rowSet = tableLoader.writer();
+      int base = tableLoader.batchCount() * 50 + 1;
+      for (int i = 0; i < 50; i++) {
+        if (rowSet.isFull()) {
+          break;
+        }
+        rowSet.addSingleCol(base + i);
+      }
+      return true;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  /**
+   * LIMIT 0, to obtain only the schema.
+   */
+  @Test
+  public void testLimit0() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(0);
+    ScanFixture scanFixture =  builder.build();

Review comment:
       ```suggestion
       ScanFixture scanFixture = builder.build();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org