You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by Ben-Zvi <gi...@git.apache.org> on 2017/05/27 01:22:37 UTC

[GitHub] drill pull request #808: DRILL-5325: Unit tests for the managed sort

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/808#discussion_r118808994
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java ---
    @@ -0,0 +1,506 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +public class SortMemoryManager {
    +
    +  /**
    +   * Maximum memory this operator may use. Usually comes from the
    +   * operator definition, but may be overridden by a configuration
    +   * parameter for unit testing.
    +   */
    +
    +  private final long memoryLimit;
    +
    +  /**
    +   * Estimated size of the records for this query, updated on each
    +   * new batch received from upstream.
    +   */
    +
    +  private int estimatedRowWidth;
    +
    +  /**
    +   * Size of the merge batches that this operator produces. Generally
    +   * the same as the merge batch size, unless low memory forces a smaller
    +   * value.
    +   */
    +
    +  private int expectedMergeBatchSize;
    +
    +  /**
    +   * Estimate of the input batch size based on the largest batch seen
    +   * thus far.
    +   */
    +  private int estimatedInputBatchSize;
    +
    +  /**
    +   * Maximum memory level before spilling occurs. That is, we can buffer input
    +   * batches in memory until we reach the level given by the buffer memory pool.
    +   */
    +
    +  private long bufferMemoryLimit;
    +
    +  /**
    +   * Maximum memory that can hold batches during the merge
    +   * phase.
    +   */
    +
    +  private long mergeMemoryLimit;
    +
    +  /**
    +   * The target size for merge batches sent downstream.
    +   */
    +
    +  private int preferredMergeBatchSize;
    +
    +  /**
    +   * The configured size for each spill batch.
    +   */
    +  private int preferredSpillBatchSize;
    +
    +  /**
    +   * Estimated number of rows that fit into a single spill batch.
    +   */
    +
    +  private int spillBatchRowCount;
    +
    +  /**
    +   * The estimated actual spill batch size which depends on the
    +   * details of the data rows for any particular query.
    +   */
    +
    +  private int expectedSpillBatchSize;
    +
    +  /**
    +   * The number of records to add to each output batch sent to the
    +   * downstream operator or spilled to disk.
    +   */
    +
    +  private int mergeBatchRowCount;
    +
    +  private SortConfig config;
    +
    +//  private long spillPoint;
    +
    +//  private long minMergeMemory;
    +
    +  private int estimatedInputSize;
    +
    +  private boolean potentialOverflow;
    +
    +  public SortMemoryManager(SortConfig config, long memoryLimit) {
    +    this.config = config;
    +
    +    // The maximum memory this operator can use as set by the
    +    // operator definition (propagated to the allocator.)
    +
    +    if (config.maxMemory() > 0) {
    +      this.memoryLimit = Math.min(memoryLimit, config.maxMemory());
    +    } else {
    +      this.memoryLimit = memoryLimit;
    +    }
    +
    +    preferredSpillBatchSize = config.spillBatchSize();;
    +    preferredMergeBatchSize = config.mergeBatchSize();
    +  }
    +
    +  /**
    +   * Update the data-driven memory use numbers including:
    +   * <ul>
    +   * <li>The average size of incoming records.</li>
    +   * <li>The estimated spill and output batch size.</li>
    +   * <li>The estimated number of average-size records per
    +   * spill and output batch.</li>
    +   * <li>The amount of memory set aside to hold the incoming
    +   * batches before spilling starts.</li>
    +   * </ul>
    +   * <p>
    +   * Under normal circumstances, the amount of memory available is much
    +   * larger than the input, spill or merge batch sizes. The primary question
    +   * is to determine how many input batches we can buffer during the load
    +   * phase, and how many spill batches we can merge during the merge
    +   * phase.
    +   *
    +   * @param batchSize the overall size of the current batch received from
    +   * upstream
    +   * @param batchRowWidth the width in bytes (including overhead) of each
    +   * row in the current input batch
    +   * @param batchRowCount the number of actual (not filtered) records in
    +   * that upstream batch
    +   */
    +
    +  public void updateEstimates(int batchSize, int batchRowWidth, int batchRowCount) {
    +
    +    // The record count should never be zero, but better safe than sorry...
    +
    +    if (batchRowCount == 0) {
    +      return; }
    +
    +
    +    // Update input batch estimates.
    +    // Go no further if nothing changed.
    +
    +    if (! updateInputEstimates(batchSize, batchRowWidth, batchRowCount)) {
    +      return;
    +    }
    +
    +    updateSpillSettings();
    +    updateMergeSettings();
    +    adjustForLowMemory();
    +    logSettings(batchRowCount);
    +  }
    +
    +  private boolean updateInputEstimates(int batchSize, int batchRowWidth, int batchRowCount) {
    +
    +    // The row width may end up as zero if all fields are nulls or some
    +    // other unusual situation. In this case, assume a width of 10 just
    +    // to avoid lots of special case code.
    +
    +    if (batchRowWidth == 0) {
    +      batchRowWidth = 10;
    +    }
    +
    +    // We know the batch size and number of records. Use that to estimate
    +    // the average record size. Since a typical batch has many records,
    +    // the average size is a fairly good estimator. Note that the batch
    +    // size includes not just the actual vector data, but any unused space
    +    // resulting from power-of-two allocation. This means that we don't
    +    // have to do size adjustments for input batches as we will do below
    +    // when estimating the size of other objects.
    +
    +    // Record sizes may vary across batches. To be conservative, use
    +    // the largest size observed from incoming batches.
    +
    +    int origRowEstimate = estimatedRowWidth;
    +    estimatedRowWidth = Math.max(estimatedRowWidth, batchRowWidth);
    +
    +    // Maintain an estimate of the incoming batch size: the largest
    +    // batch yet seen. Used to reserve memory for the next incoming
    +    // batch. Because we are using the actual observed batch size,
    +    // the size already includes overhead due to power-of-two rounding.
    +
    +    long origInputBatchSize = estimatedInputBatchSize;
    +    estimatedInputBatchSize = Math.max(estimatedInputBatchSize, batchSize);
    +
    +    // Estimate the total size of each incoming batch plus sv2. Note that, due
    +    // to power-of-two rounding, the allocated sv2 size might be twice the data size.
    +
    +    estimatedInputSize = estimatedInputBatchSize + 4 * batchRowCount;
    +
    +    // Return whether anything changed.
    +
    +    return estimatedRowWidth != origRowEstimate || estimatedInputBatchSize != origInputBatchSize;
    +  }
    +
    +  /**
    +   * Determine the number of records to spill per spill batch. The goal is to
    +   * spill batches of either 64K records, or as many records as fit into the
    +   * amount of memory dedicated to each spill batch, whichever is less.
    +   */
    +
    +  private void updateSpillSettings() {
    +
    +    spillBatchRowCount = rowsPerBatch(preferredSpillBatchSize);
    +
    +    // Compute the actual spill batch size which may be larger or smaller
    +    // than the preferred size depending on the row width. Double the estimated
    +    // memory needs to allow for power-of-two rounding.
    +
    +    expectedSpillBatchSize = batchForRows(spillBatchRowCount);
    +
    +    // Determine the minimum memory needed for spilling. Spilling is done just
    +    // before accepting a spill batch, so we must spill if we don't have room for a
    +    // (worst case) input batch. To spill, we need room for the spill batch created
    +    // by merging the batches already in memory.
    +
    +    bufferMemoryLimit = memoryLimit - expectedSpillBatchSize;
    +  }
    +
    +  /**
    +   * Determine the number of records per batch per merge step. The goal is to
    +   * merge batches of either 64K records, or as many records as fit into the
    +   * amount of memory dedicated to each merge batch, whichever is less.
    +   */
    +
    +  private void updateMergeSettings() {
    +
    +    mergeBatchRowCount = rowsPerBatch(preferredMergeBatchSize);
    +    expectedMergeBatchSize = batchForRows(mergeBatchRowCount);
    +
    +    // The merge memory pool assumes we can spill all input batches. The memory
    +    // available to hold spill batches for merging is total memory minus the
    +    // expected output batch size.
    +
    +    mergeMemoryLimit = memoryLimit - expectedMergeBatchSize;
    +  }
    +
    +  /**
    +   * In a low-memory situation we have to approach the memory assignment
    +   * problem from a different angle. Memory is low enough that we can't
    +   * fit the incoming batches (of a size decided by the upstream operator)
    +   * and our usual spill or merge batch sizes. Instead, we have to
    +   * determine the largest spill and merge batch sizes possible given
    +   * the available memory, input batch size and row width. We shrink the
    +   * sizes of the batches we control to try to make things fit into limited
    +   * memory. At some point, however, if we cannot fit even two input
    +   * batches and even the smallest merge match, then we will run into an
    +   * out-of-memory condition and we log a warning.
    +   * <p>
    +   * Note that these calculations are a bit crazy: it is Drill that
    +   * decided to allocate the small memory, it is Drill that created the
    +   * large incoming batches, and so it is Drill that created the low
    +   * memory situation. Over time, a better fix for this condition is to
    +   * control memory usage at the query level so that the sort is guaranteed
    +   * to have sufficient memory. But, since we don't yet have the luxury
    +   * of making such changes, we just live with the situation as we find
    +   * it.
    +   */
    +
    +  private void adjustForLowMemory() {
    +
    +    long loadHeadroom = bufferMemoryLimit - 2 * estimatedInputSize;
    +    long mergeHeadroom = mergeMemoryLimit - 2 * expectedSpillBatchSize;
    +    if (loadHeadroom >= 0  &&  mergeHeadroom >= 0) {
    +      return;
    +    }
    +
    +    lowMemorySpillBatchSize();
    +    lowMemoryMergeBatchSize();
    +
    +    // Sanity check: if we've been given too little memory to make progress,
    +    // issue a warning but proceed anyway. Should only occur if something is
    +    // configured terribly wrong.
    +
    +    long minNeeds = 2 * estimatedInputSize + expectedSpillBatchSize;
    +    if (minNeeds > memoryLimit) {
    +      ExternalSortBatch.logger.warn("Potential memory overflow during load phase! " +
    +          "Minumum needed = {} bytes, actual available = {} bytes",
    +          minNeeds, memoryLimit);
    +      bufferMemoryLimit = 0;
    +      potentialOverflow = true;
    +    }
    +
    +    // Sanity check
    +
    +    minNeeds = 2 * expectedSpillBatchSize + expectedMergeBatchSize;
    +    if (minNeeds > memoryLimit) {
    +      ExternalSortBatch.logger.warn("Potential memory overflow during merge phase! " +
    +          "Minumum needed = {} bytes, actual available = {} bytes",
    --- End diff --
    
    "Minimum" - minor typo 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---