You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2022/01/30 11:52:48 UTC

[GitHub] [drill] luocooong commented on a change in pull request #2441: DRILL-8115: Adds LIMIT pushdown support in the scan framework

luocooong commented on a change in pull request #2441:
URL: https://github.com/apache/drill/pull/2441#discussion_r795164948



##########
File path: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLimit.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V1 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ */
+public class TestScanOperExecLimit extends BaseScanOperatorExecTest {
+
+  /**
+   * Mock reader that returns two 50-row batches.
+   */
+  protected static class Mock50RowReader implements ManagedReader<SchemaNegotiator> {
+    protected boolean openCalled;
+    protected ResultSetLoader tableLoader;
+
+    @Override
+    public boolean open(SchemaNegotiator negotiator) {
+      openCalled = true;
+      negotiator.tableSchema(new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .build(), true);
+        tableLoader = negotiator.build();
+      return true;
+    }
+
+    @Override
+    public boolean next() {
+      if (tableLoader.batchCount() > 2) {
+        return false;
+      }
+      RowSetLoader rowSet = tableLoader.writer();
+      int base = tableLoader.batchCount() * 50 + 1;
+      for (int i = 0; i < 50; i++) {
+        if (rowSet.isFull()) {
+          break;
+        }
+        rowSet.addSingleCol(base + i);
+      }
+      return true;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  /**
+   * LIMIT 0, to obtain only the schema.
+   */
+  @Test
+  public void testLimit0() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(0);
+    ScanFixture scanFixture =  builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(0, batch.rowCount());
+    assertEquals(1, batch.schema().getFieldCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    scanFixture.close();
+
+    assertTrue(reader1.openCalled);
+    assertFalse(reader2.openCalled);
+  }
+
+  /**
+   * LIMIT 1, simplest case
+   */
+  @Test
+  public void testLimit1() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(1);
+    ScanFixture scanFixture =  builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(1, batch.rowCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    scanFixture.close();
+
+    assertTrue(reader1.openCalled);
+    assertFalse(reader2.openCalled);
+  }
+
+  /**
+   * LIMIT 50, same as batch size, to check boundary conditions.
+   */
+  @Test
+  public void testLimitOnBatchEnd() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(50);
+    ScanFixture scanFixture =  builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    scanFixture.close();
+
+    assertTrue(reader1.openCalled);
+    assertFalse(reader2.openCalled);
+  }
+
+  /**
+   * LIMIT 75, halfway through second batch.
+   */
+  @Test
+  public void testLimitOnScondBatch() {

Review comment:
       ```suggestion
     public void testLimitOnSecondBatch() {
   ```

##########
File path: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLimit.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V1 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ */
+public class TestScanOperExecLimit extends BaseScanOperatorExecTest {
+
+  /**
+   * Mock reader that returns two 50-row batches.
+   */
+  protected static class Mock50RowReader implements ManagedReader<SchemaNegotiator> {
+    protected boolean openCalled;
+    protected ResultSetLoader tableLoader;
+
+    @Override
+    public boolean open(SchemaNegotiator negotiator) {
+      openCalled = true;
+      negotiator.tableSchema(new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .build(), true);
+        tableLoader = negotiator.build();
+      return true;
+    }
+
+    @Override
+    public boolean next() {
+      if (tableLoader.batchCount() > 2) {
+        return false;
+      }
+      RowSetLoader rowSet = tableLoader.writer();
+      int base = tableLoader.batchCount() * 50 + 1;
+      for (int i = 0; i < 50; i++) {
+        if (rowSet.isFull()) {
+          break;
+        }
+        rowSet.addSingleCol(base + i);
+      }
+      return true;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  /**
+   * LIMIT 0, to obtain only the schema.
+   */
+  @Test
+  public void testLimit0() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(0);
+    ScanFixture scanFixture =  builder.build();

Review comment:
       The same at line 120, line 148, line 176, line 209 and line 242.

##########
File path: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLimit.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.v3;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V2 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ * <p>
+ * This test is from the outside: at the scan operator level.
+ */
+public class TestScanLimit extends BaseScanTest {
+
+  /**
+   * Mock reader that returns two 50-row batches.
+   */
+  protected static class Mock50RowReader implements ManagedReader {
+
+    private final ResultSetLoader tableLoader;
+
+    public Mock50RowReader(SchemaNegotiator negotiator) {
+      negotiator.tableSchema(new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .build());
+      tableLoader = negotiator.build();
+    }
+
+    @Override
+    public boolean next() {
+      if (tableLoader.batchCount() > 1) {
+        return false;
+      }
+      RowSetLoader rowSet = tableLoader.writer();
+      int base = tableLoader.batchCount() * 50 + 1;
+      for (int i = 0; i < 50; i++) {
+        if (rowSet.isFull()) {
+          break;
+        }
+        rowSet.addSingleCol(base + i);
+      }
+      return true;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  private static class TestFixture {
+    ObservableCreator creator1;
+    ObservableCreator creator2;
+    ScanFixture scanFixture;
+    ScanOperatorExec scan;
+
+    public TestFixture(long limit) {
+      creator1 = new ObservableCreator() {
+        @Override
+        public ManagedReader create(SchemaNegotiator negotiator) {
+          return new Mock50RowReader(negotiator);
+        }
+      };
+      creator2 = new ObservableCreator() {
+        @Override
+        public ManagedReader create(SchemaNegotiator negotiator) {
+          return new Mock50RowReader(negotiator);
+        }
+      };
+      BaseScanFixtureBuilder builder = simpleBuilder(creator1, creator2);
+      builder.builder.limit(limit);
+      scanFixture = builder.build();
+      scan = scanFixture.scanOp;
+    }
+
+    public void close() { scanFixture.close(); }
+
+    public int createCount() {
+      if (creator1.reader == null) {
+        return 0;
+      }
+      if (creator2.reader == null) {
+        return 1;
+      }
+      return 2;
+    }
+  }
+
+  /**
+   * LIMIT 0, to obtain only the schema.
+   */
+  @Test
+  public void testLimit0() {
+    TestFixture fixture = new TestFixture(0);
+    ScanOperatorExec scan = fixture.scan;
+
+    assertTrue(scan.buildSchema());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(0, batch.rowCount());
+    assertEquals(1, batch.schema().getFieldCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    fixture.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, fixture.createCount());
+  }
+
+  /**
+   * LIMIT 1, simplest case
+   */
+  @Test
+  public void testLimit1() {
+    TestFixture fixture = new TestFixture(1);
+    ScanOperatorExec scan = fixture.scan;
+
+    // Reader builds schema, and stops after one row, though the reader
+    // itself is happy to provide more.
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(1, batch.rowCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    fixture.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, fixture.createCount());
+  }
+
+  /**
+   * LIMIT 50, same as batch size, to check boundary conditions.
+   */
+  @Test
+  public void testLimitOnBatchEnd() {
+    TestFixture fixture = new TestFixture(50);
+    ScanOperatorExec scan = fixture.scan;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    fixture.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, fixture.createCount());
+  }
+
+  /**
+   * LIMIT 75, halfway through second batch.
+   */
+  @Test
+  public void testLimitOnScondBatch() {

Review comment:
       ```suggestion
     public void testLimitOnSecondBatch() {
   ```

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
##########
@@ -282,8 +302,8 @@ private void endBatch() {
     if (tableLoader.batchCount() == 1 || prevTableSchemaVersion < tableLoader.schemaVersion()) {
       reviseOutputProjection(tableLoader.outputSchema());
     }
-    buildOutputBatch(readerOutput);
-    scanLifecycle.tallyBatch();
+    int rowCount = buildOutputBatch(readerOutput);

Review comment:
       In other places, use `long` to define `rowCount`. So, this is simply the number of rows in a container, and it's not able to reach the `int` maximum, is that correct?

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockBatchReader.java
##########
@@ -140,7 +133,7 @@ public boolean next() {
       writer.save();
     }
 
-    return true;
+    return !loader.atLimit();

Review comment:
       The "EVF V2" automatically checks the limit, it that correct?
   https://github.com/apache/drill/blob/282b42256a996729d4079b52176c2ab7068d2de6/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java#L251-L253

##########
File path: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleLimit.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.v3.lifecycle;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V2 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ * <p>
+ * This test is at the level of the scan framework, stripping away
+ * the outer scan operator level.
+ */
+public class TestScanLifecycleLimit extends BaseTestScanLifecycle {
+
+  /**
+   * Mock reader that returns two 50-row batches.
+   */
+  protected static class Mock50RowReader implements ManagedReader {
+
+    private final ResultSetLoader tableLoader;
+
+    public Mock50RowReader(SchemaNegotiator negotiator) {
+      negotiator.tableSchema(new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .build());
+      tableLoader = negotiator.build();
+    }
+
+    @Override
+    public boolean next() {
+      if (tableLoader.batchCount() > 1) {
+        return false;
+      }
+      RowSetLoader rowSet = tableLoader.writer();
+      int base = tableLoader.batchCount() * 50 + 1;
+      for (int i = 0; i < 50; i++) {
+        if (rowSet.isFull()) {
+          break;
+        }
+        rowSet.addSingleCol(base + i);
+      }
+      return true;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  private Pair<TwoReaderFactory, ScanLifecycle> setupScan(long limit) {
+    ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
+    builder.projection(RowSetTestUtils.projectList("a"));
+    TwoReaderFactory factory = new TwoReaderFactory() {
+      @Override
+      public ManagedReader firstReader(SchemaNegotiator negotiator) {
+        return new Mock50RowReader(negotiator);
+      }
+
+      @Override
+      public ManagedReader secondReader(SchemaNegotiator negotiator) {
+        return new Mock50RowReader(negotiator);
+      }
+    };
+    builder.readerFactory(factory);
+
+    builder.limit(limit);
+    return Pair.of(factory, buildScan(builder));
+  }
+
+  /**
+   * LIMIT 0, to obtain only the schema.
+   */
+  @Test
+  public void testLimit0() {
+    Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(0);
+    TwoReaderFactory factory = pair.getLeft();
+    ScanLifecycle scan = pair.getRight();
+
+    // Reader builds schema, but returns no data, though the reader
+    // itself is happy to provide data.
+    RowBatchReader reader = scan.nextReader();
+    assertTrue(reader.open());
+    assertTrue(reader.next());
+    RowSet result = fixture.wrap(reader.output());
+    assertEquals(0, result.rowCount());
+    assertEquals(1, result.schema().size());
+    result.clear();
+
+    // No second batch
+    assertFalse(reader.next());
+    reader.close();
+
+    // No next reader, despite there being two, since we hit the limit.
+    assertNull(scan.nextReader());
+
+    scan.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, factory.count());
+  }
+
+  /**
+   * LIMIT 1, simplest case
+   */
+  @Test
+  public void testLimit1() {
+    Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(1);
+    TwoReaderFactory factory = pair.getLeft();
+    ScanLifecycle scan = pair.getRight();
+
+    // Reader builds schema, and stops after one row, though the reader
+    // itself is happy to provide more.
+    RowBatchReader reader = scan.nextReader();
+    assertTrue(reader.open());
+    assertTrue(reader.next());
+    RowSet result = fixture.wrap(reader.output());
+    assertEquals(1, result.rowCount());
+    assertEquals(1, result.schema().size());
+    result.clear();
+
+    // No second batch
+    assertFalse(reader.next());
+    reader.close();
+
+    // No next reader, despite there being two, since we hit the limit.
+    assertNull(scan.nextReader());
+
+    scan.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, factory.count());
+  }
+
+  /**
+   * LIMIT 50, same as batch size, to check boundary conditions.
+   */
+  @Test
+  public void testLimitOnBatchEnd() {
+    Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(50);
+    TwoReaderFactory factory = pair.getLeft();
+    ScanLifecycle scan = pair.getRight();
+
+    RowBatchReader reader = scan.nextReader();
+    assertTrue(reader.open());
+    assertTrue(reader.next());
+    RowSet result = fixture.wrap(reader.output());
+    assertEquals(50, result.rowCount());
+    result.clear();
+
+    // No second batch
+    assertFalse(reader.next());
+    reader.close();
+
+    // No next reader, despite there being two, since we hit the limit.
+    assertNull(scan.nextReader());
+
+    scan.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, factory.count());
+  }
+
+  /**
+   * LIMIT 75, halfway through second batch.
+   */
+  @Test
+  public void testLimitOnScondBatch() {

Review comment:
       ```suggestion
     public void testLimitOnSecondBatch() {
   ```

##########
File path: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLimit.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V1 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ */
+public class TestScanOperExecLimit extends BaseScanOperatorExecTest {
+
+  /**
+   * Mock reader that returns two 50-row batches.
+   */
+  protected static class Mock50RowReader implements ManagedReader<SchemaNegotiator> {
+    protected boolean openCalled;
+    protected ResultSetLoader tableLoader;
+
+    @Override
+    public boolean open(SchemaNegotiator negotiator) {
+      openCalled = true;
+      negotiator.tableSchema(new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .build(), true);
+        tableLoader = negotiator.build();
+      return true;
+    }
+
+    @Override
+    public boolean next() {
+      if (tableLoader.batchCount() > 2) {
+        return false;
+      }
+      RowSetLoader rowSet = tableLoader.writer();
+      int base = tableLoader.batchCount() * 50 + 1;
+      for (int i = 0; i < 50; i++) {
+        if (rowSet.isFull()) {
+          break;
+        }
+        rowSet.addSingleCol(base + i);
+      }
+      return true;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  /**
+   * LIMIT 0, to obtain only the schema.
+   */
+  @Test
+  public void testLimit0() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(0);
+    ScanFixture scanFixture =  builder.build();

Review comment:
       ```suggestion
       ScanFixture scanFixture = builder.build();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org