You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ppadma <gi...@git.apache.org> on 2018/01/14 23:54:58 UTC

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

GitHub user ppadma opened a pull request:

    https://github.com/apache/drill/pull/1091

    DRILL-6071: Limit batch size for flatten operator

    Please see DRILL-6071 for details.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ppadma/drill DRILL-6071

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/1091.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1091
    
----
commit 8c7d60440e41efeab53fe78d9db579e7f85be149
Author: Padma Penumarthy <pp...@...>
Date:   2018-01-10T13:06:58Z

    DRILL-6071: Limit batch size for flatten operator

----


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/drill/pull/1091


---

[GitHub] drill issue #1091: DRILL-6071: Limit batch size for flatten operator

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on the issue:

    https://github.com/apache/drill/pull/1091
  
    @paul-rogers  Paul, can you please review this PR ? 


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r162225832
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---
    @@ -94,8 +98,54 @@ private void clear() {
         }
       }
     
    +  private class FlattenMemoryManager {
    +    private final int outputRowCount;
    +    private static final int OFFSET_VECTOR_WIDTH = 4;
    +    private static final int WORST_CASE_FRAGMENTATION_FACTOR = 2;
    +    private static final int MAX_NUM_ROWS = 64 * 1024;
    +    private static final int MIN_NUM_ROWS = 1;
    +
    +    private FlattenMemoryManager(RecordBatch incoming, long outputBatchSize, SchemaPath flattenColumn) {
    +      // Get sizing information for the batch.
    +      RecordBatchSizer sizer = new RecordBatchSizer(incoming);
    +
    +      final TypedFieldId typedFieldId = incoming.getValueVectorId(flattenColumn);
    +      final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
    +
    +      // Get column size of flatten column.
    +      RecordBatchSizer.ColumnSize columnSize = sizer.getColumn(incoming.getValueAccessorById(field.getValueClass(),
    +          typedFieldId.getFieldIds()).getValueVector(), field.getName());
    +
    +      // Average rowWidth of flatten column
    +      final int avgRowWidthFlattenColumn = RecordBatchSizer.safeDivide(columnSize.dataSize, incoming.getRecordCount());
    +
    +      // Average rowWidth excluding the flatten column.
    +      final int avgRowWidthWithOutFlattenColumn = sizer.netRowWidth() - avgRowWidthFlattenColumn;
    +
    +      // Average rowWidth of single element in the flatten list.
    +      // subtract the offset vector size from column data size.
    +      final int avgRowWidthSingleFlattenEntry =
    +          RecordBatchSizer.safeDivide(columnSize.dataSize - (OFFSET_VECTOR_WIDTH * columnSize.valueCount), columnSize.elementCount);
    +
    +      // Average rowWidth of outgoing batch.
    +      final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry;
    +
    +      // Number of rows in outgoing batch
    +      outputRowCount = Math.max(MIN_NUM_ROWS, Math.min(MAX_NUM_ROWS,
    +          RecordBatchSizer.safeDivide((outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR), avgOutgoingRowWidth)));
    +    }
    --- End diff --
    
    Would be great to log this info in debug mode to aid in tuning. Doing this in the sort proved highly valuable.


---

[GitHub] drill issue #1091: DRILL-6071: Limit batch size for flatten operator

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on the issue:

    https://github.com/apache/drill/pull/1091
  
    @paul-rogers Thank you Paul. Ready for review. Please take a look when you get a chance. 


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r164231963
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---
    @@ -94,8 +98,57 @@ private void clear() {
         }
       }
     
    +  private class FlattenMemoryManager {
    --- End diff --
    
    Memory manager actually requires incoming batch. All the tests in testOutputBatch are exercising this code. I went down the path of writing tests just for memory manager. But, they are redundant and doing the same thing I am doing with the other tests. Also, did not see much point in validating output row count, since we are interested in batch size more than row count. Please let me know if you feel otherwise and I can include those tests as well. 


---

[GitHub] drill issue #1091: DRILL-6071: Limit batch size for flatten operator

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on the issue:

    https://github.com/apache/drill/pull/1091
  
    The new changes look good. Please ping me when you are ready for another final review.


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r162846516
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---
    @@ -76,6 +76,9 @@ private ExecConstants() {
       public static final String SPILL_FILESYSTEM = "drill.exec.spill.fs";
       public static final String SPILL_DIRS = "drill.exec.spill.directories";
     
    +  public static final String OUTPUT_BATCH_SIZE = "drill.exec.memory.operator.output_batch_sizeinMB";
    --- End diff --
    
    I wonder if MB is too coarse. Maybe just make this the batch size in bytes or K. If this was a config option, we could use HOCON syntax for MB and so on. Since it a session/system option, I suppose it must be a simple number. For example, below, in the external sort batch size, it is in bytes, isn't it?
    
    Further, this should be restricted to be a system option. Check with Tim on how to do that; he and/or Jyothsna implemented a way to do that.


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r162226190
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java ---
    @@ -380,14 +394,19 @@ private void measureColumn(ValueVector v, String prefix) {
         // vectors do consume space, so visit columns recursively.
     
         switch (v.getField().getType().getMinorType()) {
    -    case MAP:
    -      expandMap((AbstractMapVector) v, prefix + v.getField().getName() + ".");
    -      break;
    -    case LIST:
    -      expandList((RepeatedListVector) v, prefix + v.getField().getName() + ".");
    -      break;
    -    default:
    -      v.collectLedgers(ledgers);
    +      case MAP:
    +        expandMap((AbstractMapVector) v, prefix + v.getField().getName() + ".");
    +        break;
    +      case LIST:
    --- End diff --
    
    Here, check mode. If mode is REPEATED, this is a RepeatedListVector and should go down one path. Otherwise, it is a ListVector (possible list of unions) and should go down another path.


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r162846630
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java ---
    @@ -119,6 +119,16 @@ public void setMaxAllocation(long maxAllocation) {
         /*throw new DrillRuntimeException("Unsupported method: setMaxAllocation()");*/
       }
     
    +  @Override
    --- End diff --
    
    It the size is a system option, then it need not be passed in each operator definition. Note that all operators should use the same batch size, so we'd never set this per operator. (The output of one operator is the input to another, and the inputs want to be controlled, we can only do that by controlling outputs.)


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r164233329
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java ---
    @@ -0,0 +1,498 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.drill.exec.physical.unit;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.drill.common.expression.SchemaPath;
    +
    +import org.apache.drill.exec.physical.base.AbstractBase;
    +import org.apache.drill.exec.physical.base.PhysicalOperator;
    +import org.apache.drill.exec.physical.config.FlattenPOP;
    +import org.apache.drill.exec.physical.impl.ScanBatch;
    +import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.VectorAccessible;
    +import org.apache.drill.exec.util.JsonStringArrayList;
    +import org.apache.drill.exec.util.JsonStringHashMap;
    +import org.apache.drill.exec.util.Text;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
    --- End diff --
    
    I added a test case like this, testFlattenLargeRecords and there are bunch of other test cases as well.
    All the tests are verifying the batch sizes and number of batches.


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r162846707
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---
    @@ -94,8 +98,57 @@ private void clear() {
         }
       }
     
    +  private class FlattenMemoryManager {
    --- End diff --
    
    The memory manager is an excellent idea. But, it is private. This means we are passing up one of the key benefits of this kind of manager: the ability to unit test it separately without needing actual memory or batches. (See the external sort for an example.)


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r162847468
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java ---
    @@ -0,0 +1,498 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.drill.exec.physical.unit;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.drill.common.expression.SchemaPath;
    +
    +import org.apache.drill.exec.physical.base.AbstractBase;
    +import org.apache.drill.exec.physical.base.PhysicalOperator;
    +import org.apache.drill.exec.physical.config.FlattenPOP;
    +import org.apache.drill.exec.physical.impl.ScanBatch;
    +import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.VectorAccessible;
    +import org.apache.drill.exec.util.JsonStringArrayList;
    +import org.apache.drill.exec.util.JsonStringHashMap;
    +import org.apache.drill.exec.util.Text;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
    --- End diff --
    
    It seems the tests expect a small number of output records. I wonder, do we have a test with a large number of records, that will expand into a larger number of records, so that we can actually test the batch size limit?
    
    Something like this: generate a large JSON file (at least two batch sizes worth) of records something like this:
    ```
    (id1, dummy1, [id2, dummy2])
    ```
    id1 and id2 are just increasing numbers. dummy1 and dummy2 are large strings, created just to take up space. Put, say, 10, 20 or more records in each array.
    
    Given the sizes, we should be able to predict the number of output records per output batch. Since the tests here verify that the data is correct, we only need verify that the batches are the right size.
    
    In DRILL-6049 (PR #1085) there is a class that reads a multi-batch result set as a series of row sets. With the row set, you can easily get the number of rows per batch. (Can also be done with the older mechanisms, if with a bit more code.)
    
    Then, run the test to make sure the batches contain the number of records predicted by the size calcs.
    
    To reduce the amount of data, you can even use the ClusterFixture mechanism (or alter system) to reduce the output batch size.


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r162225383
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java ---
    @@ -29,9 +28,12 @@
     
       public static long INIT_ALLOCATION = 1_000_000L;
       public static long MAX_ALLOCATION = 10_000_000_000L;
    +  // Default output batch size, 512MB
    +  public static long OUTPUT_BATCH_SIZE = 512 * 1024 * 1024L;
    --- End diff --
    
    Too large. The sort & hash agg operators often receive just 20-40 MB on a large cluster. (That is, itself, an issue, but one that has proven very difficult to resolve.) So, the output batch size must be no larger than 1/3 this size (for sort). Probably some team discussion is required to agree on a good number, and on the work needed to ensure that sort, hash agg and hash join are given sufficient memory for the selected batch size.


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r162846639
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java ---
    @@ -89,6 +89,18 @@
        */
       public void setMaxAllocation(long maxAllocation);
     
    +  /**
    +   *
    +   * @param outputBatchSize maximum output batch size
    +   */
    +  public void setOutputBatchSize(long outputBatchSize);
    --- End diff --
    
    See above.


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r162226341
  
    --- Diff: exec/java-exec/src/main/resources/drill-module.conf ---
    @@ -312,12 +312,13 @@ drill.exec: {
       memory: {
         operator: {
           max: 20000000000,
    -      initial: 10000000
    +      initial: 10000000,
    +      output_batch_size : 536870912
    --- End diff --
    
    See note above.
    
    BTW: If you add a new memory-related param, you can use the methods specifically designed for memory. Then you can say things such as 512K or 2MB. The sort code does this. See also the HOCON documentation.


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r162225507
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java ---
    @@ -29,9 +28,12 @@
     
       public static long INIT_ALLOCATION = 1_000_000L;
       public static long MAX_ALLOCATION = 10_000_000_000L;
    +  // Default output batch size, 512MB
    +  public static long OUTPUT_BATCH_SIZE = 512 * 1024 * 1024L;
    --- End diff --
    
    Also, if the size is a config parameter, shouldn't the default be set in the drill-module.conf file, not in a Java constant?


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r162225888
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java ---
    @@ -38,23 +38,20 @@
     public abstract class FlattenTemplate implements Flattener {
       private static final Logger logger = LoggerFactory.getLogger(FlattenTemplate.class);
     
    -  private static final int OUTPUT_BATCH_SIZE = 4*1024;
    -  private static final int OUTPUT_MEMORY_LIMIT = 512 * 1024 * 1024;
    +  private static final int OUTPUT_ROW_COUNT = 64*1024;
    --- End diff --
    
    `ValueVector.MAX_ROW_COUNT`


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r162225575
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---
    @@ -94,8 +98,54 @@ private void clear() {
         }
       }
     
    +  private class FlattenMemoryManager {
    +    private final int outputRowCount;
    +    private static final int OFFSET_VECTOR_WIDTH = 4;
    +    private static final int WORST_CASE_FRAGMENTATION_FACTOR = 2;
    +    private static final int MAX_NUM_ROWS = 64 * 1024;
    --- End diff --
    
    `ValueVector.MAX_ROW_COUNT`


---

[GitHub] drill issue #1091: DRILL-6071: Limit batch size for flatten operator

Posted by ppadma <gi...@git.apache.org>.
Github user ppadma commented on the issue:

    https://github.com/apache/drill/pull/1091
  
    @paul-rogers Addressed code review comments. Added a new system/session option to configure output batch size. Please review. 


---

[GitHub] drill pull request #1091: DRILL-6071: Limit batch size for flatten operator

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1091#discussion_r162846897
  
    --- Diff: exec/java-exec/src/main/resources/drill-module.conf ---
    @@ -312,7 +312,8 @@ drill.exec: {
       memory: {
         operator: {
           max: 20000000000,
    -      initial: 10000000
    +      initial: 10000000,
    +      output_batch_sizeinMB : 32
    --- End diff --
    
    Do we have two options when one would do? This is a config option. Below is a session/session option.


---