You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by sohami <gi...@git.apache.org> on 2018/04/16 22:51:16 UTC

[GitHub] drill pull request #1212: DRILL-6323: Lateral Join - Initial implementation

GitHub user sohami opened a pull request:

    https://github.com/apache/drill/pull/1212

    DRILL-6323: Lateral Join - Initial implementation

    @parthchandra - Please review.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sohami/drill DRILL-6323

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/1212.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1212
    
----
commit 38b82ce8ebca3b91ba8847229a568daf88f654a3
Author: Sorabh Hamirwasia <sh...@...>
Date:   2018-03-06T05:09:16Z

    DRILL-6323: Lateral Join - Initial implementation
                Refactor Join PopConfigs

commit 0931ac974f7be41cd5ca4516524d04c4e5c6245d
Author: Sorabh Hamirwasia <sh...@...>
Date:   2018-02-05T22:46:19Z

    DRILL-6323: Lateral Join - Initial implementation

commit 08b7d38e6ffe4fed492a2ea67c3907f0f514717e
Author: Sorabh Hamirwasia <sh...@...>
Date:   2018-02-20T22:47:48Z

    DRILL-6323: Lateral Join - Initial implementation
                Note: Refactor, fix various issues with LATERAL: a)Override prefetch call in BuildSchema phase for LATERAL, b)EMIT handling in buildSchema, c)Issue when in multilevel Lateral case
                schema change is observed only on right side of UNNEST, d)Handle SelectionVector in incoming batch, e) Handling Schema change, f) Updating joinIndexes correctly when producing multiple output batches
                for current left&right inputs.
                Added tests for a)EMIT handling in buildSchema, b)Multiple UNNEST at same level, c)Multilevel Lateral, d)Multilevel Lateral with Schema change on left/right or both branches, e) Left LATERAL join
                f)Schema change for UNNEST and Non-UNNEST columns, g)Error outcomes from left&right, h) Producing multiple output batches for given incoming, i) Consuming multiple incoming into single output batch

commit f686877198857a23227031329b2f48c163f85021
Author: Sorabh Hamirwasia <sh...@...>
Date:   2018-03-09T23:56:22Z

    DRILL-6323: Lateral Join - Initial implementation
                Note: Remove codegen and operator template class. Logic to copy data is moved to LateralJoinBatch itself

commit cb47407a8f47e7fc10ac04c438f7e903de92a80c
Author: Sorabh Hamirwasia <sh...@...>
Date:   2018-03-13T23:41:03Z

    DRILL-6323: Lateral Join - Initial implementation
                Note: Add some debug logs for LateralJoinBatch

commit 2ea968e3302ad8bb6d62a8032bd6b6329b900d3a
Author: Sorabh Hamirwasia <sh...@...>
Date:   2018-03-14T23:59:29Z

    DRILL-6323: Lateral Join - Initial implementation
                Note: Refactor BatchMemorySize to put outputBatchSize in abstract class. Created a new JoinBatchMemoryManager to be shared across join record batches.
    	          Changed merge join to use AbstractBinaryRecordBatch instead of AbstractRecordBatch, and use JoinBatchMemoryManager

commit fb521c65f91b69543ae9b5dbb9a727c79f852573
Author: Sorabh Hamirwasia <sh...@...>
Date:   2018-03-19T19:00:22Z

    DRILL-6323: Lateral Join - Initial implementation
                Note: Lateral Join Batch Memory manager support using the record batch sizer

----


---

[GitHub] drill pull request #1212: DRILL-6323: Lateral Join - Initial implementation

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1212#discussion_r182283067
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java ---
    @@ -0,0 +1,861 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.join;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import org.apache.calcite.rel.core.JoinRelType;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.common.types.Types;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.physical.base.LateralContract;
    +import org.apache.drill.exec.physical.config.LateralJoinPOP;
    +import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.JoinBatchMemoryManager;
    +import org.apache.drill.exec.record.MaterializedField;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.RecordBatchSizer;
    +import org.apache.drill.exec.record.VectorAccessibleUtilities;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.vector.ValueVector;
    +
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OUT_OF_MEMORY;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
    +
    +/**
    + * RecordBatch implementation for the lateral join operator. Currently it's expected LATERAL to co-exists with UNNEST
    + * operator. Both LATERAL and UNNEST will share a contract with each other defined at {@link LateralContract}
    + */
    +public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class);
    +
    +  // Input indexes to correctly update the stats
    +  private static final int LEFT_INPUT = 0;
    +
    +  private static final int RIGHT_INPUT = 1;
    +
    +  // Maximum number records in the outgoing batch
    +  private int maxOutputRowCount;
    +
    +  // Schema on the left side
    +  private BatchSchema leftSchema;
    +
    +  // Schema on the right side
    +  private BatchSchema rightSchema;
    +
    +  // Index in output batch to populate next row
    +  private int outputIndex;
    +
    +  // Current index of record in left incoming which is being processed
    +  private int leftJoinIndex = -1;
    +
    +  // Current index of record in right incoming which is being processed
    +  private int rightJoinIndex = -1;
    +
    +  // flag to keep track if current left batch needs to be processed in future next call
    +  private boolean processLeftBatchInFuture;
    +
    +  // Keep track if any matching right record was found for current left index record
    +  private boolean matchedRecordFound;
    +
    +  private boolean useMemoryManager = true;
    +
    +  /* ****************************************************************************************************************
    +   * Public Methods
    +   * ****************************************************************************************************************/
    +  public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
    +                          RecordBatch left, RecordBatch right) throws OutOfMemoryException {
    +    super(popConfig, context, left, right);
    +    Preconditions.checkNotNull(left);
    +    Preconditions.checkNotNull(right);
    +    final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
    +    batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right);
    +
    +    // Initially it's set to default value of 64K and later for each new output row it will be set to the computed
    +    // row count
    +    maxOutputRowCount = batchMemoryManager.getOutputRowCount();
    +  }
    +
    +  /**
    +   * Method that get's left and right incoming batch and produce the output batch. If the left incoming batch is
    +   * empty then next on right branch is not called and empty batch with correct outcome is returned. If non empty
    +   * left incoming batch is received then it call's next on right branch to get an incoming and finally produces
    +   * output.
    +   * @return IterOutcome state of the lateral join batch
    +   */
    +  @Override
    +  public IterOutcome innerNext() {
    +
    +    // We don't do anything special on FIRST state. Process left batch first and then right batch if need be
    +    IterOutcome childOutcome = processLeftBatch();
    +
    +    // reset this state after calling processLeftBatch above.
    +    processLeftBatchInFuture = false;
    +
    +    // If the left batch doesn't have any record in the incoming batch (with OK_NEW_SCHEMA/EMIT) or the state returned
    +    // from left side is terminal state then just return the IterOutcome and don't call next() on right branch
    +    if (isTerminalOutcome(childOutcome) || left.getRecordCount() == 0) {
    +      container.setRecordCount(0);
    +      return childOutcome;
    +    }
    +
    +    // Left side has some records in the batch so let's process right batch
    +    childOutcome = processRightBatch();
    +
    +    // reset the left & right outcomes to OK here and send the empty batch downstream
    +    // Assumption being right side will always send OK_NEW_SCHEMA with empty batch which is what UNNEST will do
    +    if (childOutcome == OK_NEW_SCHEMA) {
    +      leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
    +      rightUpstream = OK;
    +      return childOutcome;
    +    }
    +
    +    if (isTerminalOutcome(childOutcome)) {
    +      return childOutcome;
    +    }
    +
    +    // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in
    +    // output container based on new left schema and old right schema. If schema change failed then return STOP
    +    // downstream
    +    if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) {
    +      return STOP;
    +    }
    +
    +    // Setup the references of left, right and outgoing container in generated operator
    +    state = BatchState.NOT_FIRST;
    +
    +    // Update the memory manager
    +    updateMemoryManager(LEFT_INPUT);
    +    updateMemoryManager(RIGHT_INPUT);
    +
    +    // allocate space for the outgoing batch
    +    allocateVectors();
    +
    +    return produceOutputBatch();
    +  }
    +
    +  @Override
    +  public void close() {
    +    updateBatchMemoryManagerStats();
    +    super.close();
    +  }
    +
    +  @Override
    +  public int getRecordCount() {
    +    return container.getRecordCount();
    +  }
    +
    +  /**
    +   * Returns the left side incoming for the Lateral Join. Used by right branch leaf operator of Lateral
    +   * to process the records at leftJoinIndex.
    +   *
    +   * @return - RecordBatch received as left side incoming
    +   */
    +  @Override
    +  public RecordBatch getIncoming() {
    +    Preconditions.checkState (left != null, "Retuning null left batch. It's unexpected since right side will only be " +
    +      "called iff there is any valid left batch");
    +    return left;
    +  }
    +
    +  /**
    +   * Returns the current row index which the calling operator should process in current incoming left record batch.
    +   * LATERAL should never return it as -1 since that indicated current left batch is empty and LATERAL will never
    +   * call next on right side with empty left batch
    +   *
    +   * @return - int - index of row to process.
    +   */
    +  @Override
    +  public int getRecordIndex() {
    +    Preconditions.checkState (leftJoinIndex < left.getRecordCount(),
    +      String.format("Left join index: %d is out of bounds: %d", leftJoinIndex, left.getRecordCount()));
    +    return leftJoinIndex;
    +  }
    +
    +  /**
    +   * Returns the current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming batch
    +   */
    +  @Override
    +  public IterOutcome getLeftOutcome() {
    +    return leftUpstream;
    +  }
    +
    +  /* ****************************************************************************************************************
    +   * Protected Methods
    +   * ****************************************************************************************************************/
    +
    +  /**
    +   * Method to get left and right batch during build schema phase for {@link LateralJoinBatch}. If left batch sees a
    +   * failure outcome then we don't even call next on right branch, since there is no left incoming.
    +   * @return true if both the left/right batch was received without failure outcome.
    +   *         false if either of batch is received with failure outcome.
    +   */
    +  @Override
    +  protected boolean prefetchFirstBatchFromBothSides() {
    +    // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch
    +    leftUpstream = next(0, left);
    +
    +    boolean validBatch = setBatchState(leftUpstream);
    +
    +    if (validBatch) {
    +      rightUpstream = next(1, right);
    +      validBatch = setBatchState(rightUpstream);
    +    }
    +
    +    // EMIT outcome is not expected as part of first batch from either side
    +    if (leftUpstream == EMIT || rightUpstream == EMIT) {
    +      state = BatchState.STOP;
    +      throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " +
    +        "buildSchema phase");
    +    }
    +    return validBatch;
    +  }
    +
    +  /**
    +   * Prefetch a batch from left and right branch to know about the schema of each side. Then adds value vector in
    +   * output container based on those schemas. For this phase LATERAL always expect's an empty batch from right side
    +   * which UNNEST should abide by.
    +   *
    +   * @throws SchemaChangeException if batch schema was changed during execution
    +   */
    +  @Override
    +  protected void buildSchema() throws SchemaChangeException {
    +    // Prefetch a RecordBatch from both left and right branch
    +    if (!prefetchFirstBatchFromBothSides()) {
    +      return;
    +    }
    +    Preconditions.checkState(right.getRecordCount() == 0, "Unexpected non-empty first right batch received");
    +
    +    // Update the record memory manager
    +    updateMemoryManager(LEFT_INPUT);
    +    updateMemoryManager(RIGHT_INPUT);
    +
    +    // Setup output container schema based on known left and right schema
    +    setupNewSchema();
    +
    +    // Release the vectors received from right side
    +    VectorAccessibleUtilities.clear(right);
    +
    +    // Set join index as invalid (-1) if the left side is empty, else set it to 0
    +    leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0;
    +    rightJoinIndex = -1;
    +
    +    // Reset the left side of the IterOutcome since for this call, OK_NEW_SCHEMA will be returned correctly
    +    // by buildSchema caller and we should treat the batch as received with OK outcome.
    +    leftUpstream = OK;
    +    rightUpstream = OK;
    +  }
    +
    +  @Override
    +  protected void killIncoming(boolean sendUpstream) {
    +    this.left.kill(sendUpstream);
    +    // Reset the left side outcome as STOP since as part of right kill when UNNEST will ask IterOutcome of left incoming
    +    // from LATERAL and based on that it can make decision if the kill is coming from downstream to LATERAL or upstream
    +    // to LATERAL. Like LIMIT operator being present downstream to LATERAL or upstream to LATERAL and downstream to
    +    // UNNEST.
    +    leftUpstream = STOP;
    +    this.right.kill(sendUpstream);
    +  }
    +
    +  /* ****************************************************************************************************************
    +   * Private Methods
    +   * ****************************************************************************************************************/
    +
    +  private boolean handleSchemaChange() {
    +    try {
    +      stats.startSetup();
    +      logger.debug(String.format("Setting up new schema based on incoming batch. Old output schema: %s",
    +        container.getSchema()));
    +      setupNewSchema();
    +      return true;
    +    } catch (SchemaChangeException ex) {
    +      logger.error("Failed to handle schema change hence killing the query");
    +      context.getExecutorState().fail(ex);
    +      left.kill(true); // Can have exchange receivers on left so called with true
    +      right.kill(false); // Won't have exchange receivers on right side
    +      return false;
    +    } finally {
    +      stats.stopSetup();
    +    }
    +  }
    +
    +  private boolean isTerminalOutcome(IterOutcome outcome) {
    +    return (outcome == STOP || outcome == OUT_OF_MEMORY || outcome == NONE);
    +  }
    +
    +  /**
    +   * Process left incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is
    +   * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator. Also
    +   * when we populate the outgoing container then this method is called to get next left batch if current one is
    +   * fully processed. It calls next() on left side until we get a non-empty RecordBatch. OR we get either of
    +   * OK_NEW_SCHEMA/EMIT/NONE/STOP/OOM/NOT_YET outcome.
    +   * @return IterOutcome after processing current left batch
    +   */
    +  private IterOutcome processLeftBatch() {
    +
    +    boolean needLeftBatch = leftJoinIndex == -1;
    +
    +    // If left batch is empty
    +    while (needLeftBatch) {
    +      leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, left) : leftUpstream;
    +      final boolean emptyLeftBatch = left.getRecordCount() <=0;
    +      logger.trace("Received a left batch and isEmpty: {}", emptyLeftBatch);
    +
    +      switch (leftUpstream) {
    +        case OK_NEW_SCHEMA:
    +          // This OK_NEW_SCHEMA is received post build schema phase and from left side
    +          // If schema didn't actually changed then just handle it as OK outcome. This is fine since it is not setting
    +          // up any incoming vector references in setupNewSchema. While copying the records it always work on latest
    +          // incoming vector.
    +          if (!isSchemaChanged(left.getSchema(), leftSchema)) {
    +            logger.warn(String.format("New schema received from left side is same as previous known left schema. " +
    +              "Ignoring this schema change. Old Left Schema: %s, New Left Schema: %s", leftSchema, left.getSchema()));
    +
    +            // Current left batch is empty and schema didn't changed as well, so let's get next batch and loose
    +            // OK_NEW_SCHEMA outcome
    +            processLeftBatchInFuture = false;
    +            if (emptyLeftBatch) {
    +              continue;
    +            } else {
    +              leftUpstream = OK;
    +            }
    +          } else if (outputIndex > 0) { // can only reach here from produceOutputBatch
    +            // This means there is already some records from previous join inside left batch
    +            // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call
    +            processLeftBatchInFuture = true;
    +            return OK_NEW_SCHEMA;
    +          }
    +
    +          // If left batch is empty with actual schema change then just rebuild the output container and send empty
    +          // batch downstream
    +          if (emptyLeftBatch) {
    +            if (handleSchemaChange()) {
    +              leftJoinIndex = -1;
    +              return OK_NEW_SCHEMA;
    +            } else {
    +              return STOP;
    +            }
    +          } // else - setup the new schema information after getting it from right side too.
    +        case OK:
    +          // With OK outcome we will keep calling next until we get a batch with >0 records
    +          if (emptyLeftBatch) {
    +            leftJoinIndex = -1;
    +            continue;
    +          } else {
    +            leftJoinIndex = 0;
    +          }
    +          break;
    +        case EMIT:
    +          // don't call next on right batch
    +          if (emptyLeftBatch) {
    +            leftJoinIndex = -1;
    +            return EMIT;
    +          } else {
    +            leftJoinIndex = 0;
    +          }
    +          break;
    +        case OUT_OF_MEMORY:
    +        case NONE:
    +        case STOP:
    +          // Not using =0 since if outgoing container is empty then no point returning anything
    +          if (outputIndex > 0) { // can only reach here from produceOutputBatch
    +            processLeftBatchInFuture = true;
    +          }
    +          return leftUpstream;
    +        case NOT_YET:
    +          try {
    +            Thread.sleep(5);
    +          } catch (InterruptedException ex) {
    +            logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it " +
    +              "received NOT_YET");
    +          }
    +          break;
    +      }
    +      needLeftBatch = leftJoinIndex == -1;
    +    }
    +    return leftUpstream;
    +  }
    +
    +  /**
    +   * Process right incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is
    +   * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator and if
    +   * left batch has some records in it. Also when we populate the outgoing container then this method is called to
    +   * get next right batch if current one is fully processed.
    +   * @return IterOutcome after processing current left batch
    +   */
    +  private IterOutcome processRightBatch() {
    +    // Check if we still have records left to process in left incoming from new batch or previously half processed
    +    // batch based on indexes. We are making sure to update leftJoinIndex and rightJoinIndex correctly. Like for new
    +    // batch leftJoinIndex will always be set to zero and once leftSide batch is fully processed then it will be set
    +    // to -1.
    +    // Whereas rightJoinIndex is to keep track of record in right batch being joined with record in left batch.
    +    // So when there are cases such that all records in right batch is not consumed by the output, then rightJoinIndex
    +    // will be a valid index. When all records are consumed it will be set to -1.
    +    boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex == -1);
    +    while (needNewRightBatch) {
    +      rightUpstream = next(RIGHT_INPUT, right);
    +      switch (rightUpstream) {
    +        case OK_NEW_SCHEMA:
    +          // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a
    +          // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through
    +          //
    +          // Right batch with OK_NEW_SCHEMA is always going to be an empty batch, so let's pass the new schema
    +          // downstream and later with subsequent next() call the join output will be produced
    +          Preconditions.checkState(right.getRecordCount() == 0,
    +            "Right side batch with OK_NEW_SCHEMA is not empty");
    +
    +          if (!isSchemaChanged(right.getSchema(), rightSchema)) {
    +            logger.warn(String.format("New schema received from right side is same as previous known right schema. " +
    +              "Ignoring this schema change. Old Right schema: %s, New Right Schema: %s",
    +              rightSchema, right.getSchema()));
    +            continue;
    +          }
    +          if (handleSchemaChange()) {
    +            container.setRecordCount(0);
    +            rightJoinIndex = -1;
    +            return OK_NEW_SCHEMA;
    +          } else {
    +            return STOP;
    +          }
    +        case OK:
    +        case EMIT:
    +          // Even if there are no records we should not call next() again because in case of LEFT join empty batch is
    +          // of importance too
    +          rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
    +          needNewRightBatch = false;
    +          break;
    +        case OUT_OF_MEMORY:
    +        case NONE:
    +        case STOP:
    +          needNewRightBatch = false;
    +          break;
    +        case NOT_YET:
    +          try {
    +            Thread.sleep(10);
    +          } catch (InterruptedException ex) {
    +            logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it " +
    +              "received NOT_YET");
    +          }
    +          break;
    +      }
    +    }
    +    return rightUpstream;
    +  }
    +
    +  /**
    +   * Get's the current left and right incoming batch and does the cross join to fill the output batch. If all the
    +   * records in the either or both the batches are consumed then it get's next batch from that branch depending upon
    +   * if output batch still has some space left. If output batch is full then the output if finalized to be sent
    +   * downstream. Subsequent call's knows how to consume previously half consumed (if any) batches and producing the
    +   * output using that.
    +   *
    +   * @return - IterOutcome to be send along with output batch to downstream operator
    +   */
    +  private IterOutcome produceOutputBatch() {
    +
    +    boolean isLeftProcessed = false;
    +
    +    // Try to fully pack the outgoing container
    +    while (!isOutgoingBatchFull()) {
    +      final int previousOutputCount = outputIndex;
    +      // invoke the runtime generated method to emit records in the output batch for each leftJoinIndex
    +      crossJoinAndOutputRecords();
    +
    +      // We have produced some records in outgoing container, hence there must be a match found for left record
    +      if (outputIndex > previousOutputCount) {
    +        // Need this extra flag since there can be left join case where for current leftJoinIndex it receives a right
    +        // batch with data, then an empty batch and again another empty batch with EMIT outcome. If we just use
    +        // outputIndex then we will loose the information that few rows for leftJoinIndex is already produced using
    +        // first right batch
    +        matchedRecordFound = true;
    +      }
    +
    +      // One right batch might span across multiple output batch. So rightIndex will be moving sum of all the
    +      // output records for this record batch until it's fully consumed.
    +      //
    +      // Also it can be so that one output batch can contain records from 2 different right batch hence the
    +      // rightJoinIndex should move by number of records in output batch for current right batch only.
    +      rightJoinIndex += outputIndex - previousOutputCount;
    +      final boolean isRightProcessed = rightJoinIndex == -1 || rightJoinIndex >= right.getRecordCount();
    +
    +      // Check if above join to produce output was based on empty right batch or
    +      // it resulted in right side batch to be fully consumed. In this scenario only if rightUpstream
    +      // is EMIT then increase the leftJoinIndex.
    +      // Otherwise it means for the given right batch there is still some record left to be processed.
    +      if (isRightProcessed) {
    +        if (rightUpstream == EMIT) {
    +          if (!matchedRecordFound && JoinRelType.LEFT == popConfig.getJoinType()) {
    +            // copy left side in case of LEFT join
    +            emitLeft(leftJoinIndex, outputIndex++);
    +          }
    +          ++leftJoinIndex;
    +          // Reset matchedRecord for next left index record
    +          matchedRecordFound = false;
    +        }
    +
    +        // Release vectors of right batch. This will happen for both rightUpstream = EMIT/OK
    +        VectorAccessibleUtilities.clear(right);
    +        rightJoinIndex = -1;
    +      }
    +
    +      // Check if previous left record was last one, then set leftJoinIndex to -1
    +      isLeftProcessed = leftJoinIndex >= left.getRecordCount();
    +      if (isLeftProcessed) {
    +        leftJoinIndex = -1;
    +        VectorAccessibleUtilities.clear(left);
    +      }
    +
    +      // Check if output batch still has some space
    +      if (!isOutgoingBatchFull()) {
    +        // Check if left side still has records or not
    +        if (isLeftProcessed) {
    +          // The current left batch was with EMIT/OK_NEW_SCHEMA outcome, then return output to downstream layer before
    +          // getting next batch
    +          if (leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) {
    +            break;
    +          } else {
    +            logger.debug("Output batch still has some space left, getting new batches from left and right");
    +            // Get both left batch and the right batch and make sure indexes are properly set
    +            leftUpstream = processLeftBatch();
    +
    +            if (processLeftBatchInFuture) {
    +              logger.debug("Received left batch with outcome {} such that we have to return the current outgoing " +
    +                "batch and process the new batch in subsequent next call", leftUpstream);
    +              // We should return the current output batch with OK outcome and don't reset the leftUpstream
    +              finalizeOutputContainer();
    +              return OK;
    +            }
    +
    +            // If left batch received a terminal outcome then don't call right batch
    +            if (isTerminalOutcome(leftUpstream)) {
    +              finalizeOutputContainer();
    +              return leftUpstream;
    +            }
    +
    +            // If we have received the left batch with EMIT outcome and is empty then we should return previous output
    +            // batch with EMIT outcome
    +            if (leftUpstream == EMIT && left.getRecordCount() == 0) {
    +              isLeftProcessed = true;
    +              break;
    +            }
    +
    +            // Update the batch memory manager to use new left incoming batch
    +            updateMemoryManager(LEFT_INPUT);
    +          }
    +        }
    +
    +        // If we are here it means one of the below:
    +        // 1) Either previous left batch was not fully processed and it came with OK outcome. There is still some space
    +        // left in outgoing batch so let's get next right batch.
    +        // 2) OR previous left & right batch was fully processed and it came with OK outcome. There is space in outgoing
    +        // batch. Now we have got new left batch with OK outcome. Let's get next right batch
    +        //
    +        // It will not hit OK_NEW_SCHEMA since left side have not seen that outcome
    +        rightUpstream = processRightBatch();
    +        Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected schema change in right branch");
    +
    +        if (isTerminalOutcome(rightUpstream)) {
    +          finalizeOutputContainer();
    +          return rightUpstream;
    +        }
    +
    +        // Update the batch memory manager to use new right incoming batch
    +        updateMemoryManager(RIGHT_INPUT);
    +      }
    +    } // output batch is full to its max capacity
    +
    +    finalizeOutputContainer();
    +
    +    // Check if output batch was full and left was fully consumed or not. Since if left is not consumed entirely
    +    // but output batch is full, then if the left batch came with EMIT outcome we should send this output batch along
    +    // with OK outcome not with EMIT. Whereas if output is full and left is also fully consumed then we should send
    +    // EMIT outcome.
    +    if (leftUpstream == EMIT && isLeftProcessed) {
    +      logger.debug("Sending current output batch with EMIT outcome since left is received with EMIT and is fully " +
    +        "consumed in output batch");
    +      return EMIT;
    +    }
    +
    +    if (leftUpstream == OK_NEW_SCHEMA) {
    +      // return output batch with OK_NEW_SCHEMA and reset the state to OK
    +      logger.debug("Sending current output batch with OK_NEW_SCHEMA and resetting the left outcome to OK for next set" +
    +        " of batches");
    +      leftUpstream = OK;
    +      return OK_NEW_SCHEMA;
    +    }
    +    return OK;
    +  }
    +
    +  /**
    +   * Finalizes the current output container with the records produced so far before sending it downstream
    +   */
    +  private void finalizeOutputContainer() {
    +    VectorAccessibleUtilities.setValueCount(container, outputIndex);
    +
    +    // Set the record count in the container
    +    container.setRecordCount(outputIndex);
    +    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
    +
    +    batchMemoryManager.updateOutgoingStats(outputIndex);
    +    logger.debug("Number of records emitted: " + outputIndex);
    +
    +    // Update the output index for next output batch to zero
    +    outputIndex = 0;
    +  }
    +
    +  /**
    +   * Check if the schema changed between provided newSchema and oldSchema. It relies on
    +   * {@link BatchSchema#isEquivalent(BatchSchema)}.
    +   * @param newSchema - New Schema information
    +   * @param oldSchema -  - New Schema information to compare with
    +   *
    +   * @return - true - if newSchema is not same as oldSchema
    +   *         - false - if newSchema is same as oldSchema
    +   */
    +  private boolean isSchemaChanged(BatchSchema newSchema, BatchSchema oldSchema) {
    +    return (newSchema == null || oldSchema == null) || !newSchema.isEquivalent(oldSchema);
    +  }
    +
    +  /**
    +   * Validate if the input schema is not null and doesn't contain any Selection Vector.
    +   * @param schema - input schema to verify
    +   * @return - true: valid input schema
    +   *           false: invalid input schema
    +   */
    +  private boolean verifyInputSchema(BatchSchema schema) {
    +
    +    boolean isValid = true;
    +    if (schema == null) {
    +      logger.error("Null schema found for the incoming batch");
    +      isValid = false;
    +    } else {
    +      final BatchSchema.SelectionVectorMode svMode = schema.getSelectionVectorMode();
    +      if (svMode != BatchSchema.SelectionVectorMode.NONE) {
    +        logger.error("Incoming batch schema found with selection vector which is not supported. SVMode: {}",
    +          svMode.toString());
    +        isValid = false;
    +      }
    +    }
    +    return isValid;
    +  }
    +
    +  /**
    +   * Helps to create the outgoing container vectors based on known left and right batch schemas
    +   * @throws SchemaChangeException
    +   */
    +  private void setupNewSchema() throws SchemaChangeException {
    +
    +    logger.debug(String.format("Setting up new schema based on incoming batch. New left schema: %s" +
    +        " and New right schema: %s", left.getSchema(), right.getSchema()));
    +
    +    // Clear up the container
    +    container.clear();
    +    leftSchema = left.getSchema();
    +    rightSchema = right.getSchema();
    +
    +    if (!verifyInputSchema(leftSchema)) {
    +      throw new SchemaChangeException("Invalid Schema found for left incoming batch");
    +    }
    +
    +    if (!verifyInputSchema(rightSchema)) {
    +      throw new SchemaChangeException("Invalid Schema found for right incoming batch");
    +    }
    +
    --- End diff --
    
    fair enough.


---

[GitHub] drill pull request #1212: DRILL-6323: Lateral Join - Initial implementation

Posted by sohami <gi...@git.apache.org>.
Github user sohami commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1212#discussion_r182259926
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java ---
    @@ -0,0 +1,861 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.join;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import org.apache.calcite.rel.core.JoinRelType;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.common.types.Types;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.physical.base.LateralContract;
    +import org.apache.drill.exec.physical.config.LateralJoinPOP;
    +import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.JoinBatchMemoryManager;
    +import org.apache.drill.exec.record.MaterializedField;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.RecordBatchSizer;
    +import org.apache.drill.exec.record.VectorAccessibleUtilities;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.vector.ValueVector;
    +
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OUT_OF_MEMORY;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
    +
    +/**
    + * RecordBatch implementation for the lateral join operator. Currently it's expected LATERAL to co-exists with UNNEST
    + * operator. Both LATERAL and UNNEST will share a contract with each other defined at {@link LateralContract}
    + */
    +public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class);
    +
    +  // Input indexes to correctly update the stats
    +  private static final int LEFT_INPUT = 0;
    +
    +  private static final int RIGHT_INPUT = 1;
    +
    +  // Maximum number records in the outgoing batch
    +  private int maxOutputRowCount;
    +
    +  // Schema on the left side
    +  private BatchSchema leftSchema;
    +
    +  // Schema on the right side
    +  private BatchSchema rightSchema;
    +
    +  // Index in output batch to populate next row
    +  private int outputIndex;
    +
    +  // Current index of record in left incoming which is being processed
    +  private int leftJoinIndex = -1;
    +
    +  // Current index of record in right incoming which is being processed
    +  private int rightJoinIndex = -1;
    +
    +  // flag to keep track if current left batch needs to be processed in future next call
    +  private boolean processLeftBatchInFuture;
    +
    +  // Keep track if any matching right record was found for current left index record
    +  private boolean matchedRecordFound;
    +
    +  private boolean useMemoryManager = true;
    +
    +  /* ****************************************************************************************************************
    +   * Public Methods
    +   * ****************************************************************************************************************/
    +  public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
    +                          RecordBatch left, RecordBatch right) throws OutOfMemoryException {
    +    super(popConfig, context, left, right);
    +    Preconditions.checkNotNull(left);
    +    Preconditions.checkNotNull(right);
    +    final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
    +    batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right);
    +
    +    // Initially it's set to default value of 64K and later for each new output row it will be set to the computed
    +    // row count
    +    maxOutputRowCount = batchMemoryManager.getOutputRowCount();
    +  }
    +
    +  /**
    +   * Method that get's left and right incoming batch and produce the output batch. If the left incoming batch is
    +   * empty then next on right branch is not called and empty batch with correct outcome is returned. If non empty
    +   * left incoming batch is received then it call's next on right branch to get an incoming and finally produces
    +   * output.
    +   * @return IterOutcome state of the lateral join batch
    +   */
    +  @Override
    +  public IterOutcome innerNext() {
    +
    +    // We don't do anything special on FIRST state. Process left batch first and then right batch if need be
    +    IterOutcome childOutcome = processLeftBatch();
    +
    +    // reset this state after calling processLeftBatch above.
    +    processLeftBatchInFuture = false;
    +
    +    // If the left batch doesn't have any record in the incoming batch (with OK_NEW_SCHEMA/EMIT) or the state returned
    +    // from left side is terminal state then just return the IterOutcome and don't call next() on right branch
    +    if (isTerminalOutcome(childOutcome) || left.getRecordCount() == 0) {
    +      container.setRecordCount(0);
    +      return childOutcome;
    +    }
    +
    +    // Left side has some records in the batch so let's process right batch
    +    childOutcome = processRightBatch();
    +
    +    // reset the left & right outcomes to OK here and send the empty batch downstream
    +    // Assumption being right side will always send OK_NEW_SCHEMA with empty batch which is what UNNEST will do
    +    if (childOutcome == OK_NEW_SCHEMA) {
    +      leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
    +      rightUpstream = OK;
    +      return childOutcome;
    +    }
    +
    +    if (isTerminalOutcome(childOutcome)) {
    +      return childOutcome;
    +    }
    +
    +    // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in
    +    // output container based on new left schema and old right schema. If schema change failed then return STOP
    +    // downstream
    +    if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) {
    +      return STOP;
    +    }
    +
    +    // Setup the references of left, right and outgoing container in generated operator
    +    state = BatchState.NOT_FIRST;
    +
    +    // Update the memory manager
    +    updateMemoryManager(LEFT_INPUT);
    +    updateMemoryManager(RIGHT_INPUT);
    +
    +    // allocate space for the outgoing batch
    +    allocateVectors();
    +
    +    return produceOutputBatch();
    +  }
    +
    +  @Override
    +  public void close() {
    +    updateBatchMemoryManagerStats();
    +    super.close();
    +  }
    +
    +  @Override
    +  public int getRecordCount() {
    +    return container.getRecordCount();
    +  }
    +
    +  /**
    +   * Returns the left side incoming for the Lateral Join. Used by right branch leaf operator of Lateral
    +   * to process the records at leftJoinIndex.
    +   *
    +   * @return - RecordBatch received as left side incoming
    +   */
    +  @Override
    +  public RecordBatch getIncoming() {
    +    Preconditions.checkState (left != null, "Retuning null left batch. It's unexpected since right side will only be " +
    +      "called iff there is any valid left batch");
    +    return left;
    +  }
    +
    +  /**
    +   * Returns the current row index which the calling operator should process in current incoming left record batch.
    +   * LATERAL should never return it as -1 since that indicated current left batch is empty and LATERAL will never
    +   * call next on right side with empty left batch
    +   *
    +   * @return - int - index of row to process.
    +   */
    +  @Override
    +  public int getRecordIndex() {
    +    Preconditions.checkState (leftJoinIndex < left.getRecordCount(),
    +      String.format("Left join index: %d is out of bounds: %d", leftJoinIndex, left.getRecordCount()));
    +    return leftJoinIndex;
    +  }
    +
    +  /**
    +   * Returns the current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming batch
    +   */
    +  @Override
    +  public IterOutcome getLeftOutcome() {
    +    return leftUpstream;
    +  }
    +
    +  /* ****************************************************************************************************************
    +   * Protected Methods
    +   * ****************************************************************************************************************/
    +
    +  /**
    +   * Method to get left and right batch during build schema phase for {@link LateralJoinBatch}. If left batch sees a
    +   * failure outcome then we don't even call next on right branch, since there is no left incoming.
    +   * @return true if both the left/right batch was received without failure outcome.
    +   *         false if either of batch is received with failure outcome.
    +   */
    +  @Override
    +  protected boolean prefetchFirstBatchFromBothSides() {
    +    // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch
    +    leftUpstream = next(0, left);
    +
    +    boolean validBatch = setBatchState(leftUpstream);
    +
    +    if (validBatch) {
    +      rightUpstream = next(1, right);
    +      validBatch = setBatchState(rightUpstream);
    +    }
    +
    +    // EMIT outcome is not expected as part of first batch from either side
    +    if (leftUpstream == EMIT || rightUpstream == EMIT) {
    +      state = BatchState.STOP;
    +      throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " +
    +        "buildSchema phase");
    +    }
    +    return validBatch;
    +  }
    +
    +  /**
    +   * Prefetch a batch from left and right branch to know about the schema of each side. Then adds value vector in
    +   * output container based on those schemas. For this phase LATERAL always expect's an empty batch from right side
    +   * which UNNEST should abide by.
    +   *
    +   * @throws SchemaChangeException if batch schema was changed during execution
    +   */
    +  @Override
    +  protected void buildSchema() throws SchemaChangeException {
    +    // Prefetch a RecordBatch from both left and right branch
    +    if (!prefetchFirstBatchFromBothSides()) {
    +      return;
    +    }
    +    Preconditions.checkState(right.getRecordCount() == 0, "Unexpected non-empty first right batch received");
    +
    +    // Update the record memory manager
    +    updateMemoryManager(LEFT_INPUT);
    +    updateMemoryManager(RIGHT_INPUT);
    +
    +    // Setup output container schema based on known left and right schema
    +    setupNewSchema();
    +
    +    // Release the vectors received from right side
    +    VectorAccessibleUtilities.clear(right);
    +
    +    // Set join index as invalid (-1) if the left side is empty, else set it to 0
    +    leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0;
    +    rightJoinIndex = -1;
    +
    +    // Reset the left side of the IterOutcome since for this call, OK_NEW_SCHEMA will be returned correctly
    +    // by buildSchema caller and we should treat the batch as received with OK outcome.
    +    leftUpstream = OK;
    +    rightUpstream = OK;
    +  }
    +
    +  @Override
    +  protected void killIncoming(boolean sendUpstream) {
    +    this.left.kill(sendUpstream);
    +    // Reset the left side outcome as STOP since as part of right kill when UNNEST will ask IterOutcome of left incoming
    +    // from LATERAL and based on that it can make decision if the kill is coming from downstream to LATERAL or upstream
    +    // to LATERAL. Like LIMIT operator being present downstream to LATERAL or upstream to LATERAL and downstream to
    +    // UNNEST.
    +    leftUpstream = STOP;
    +    this.right.kill(sendUpstream);
    +  }
    +
    +  /* ****************************************************************************************************************
    +   * Private Methods
    +   * ****************************************************************************************************************/
    +
    +  private boolean handleSchemaChange() {
    +    try {
    +      stats.startSetup();
    +      logger.debug(String.format("Setting up new schema based on incoming batch. Old output schema: %s",
    +        container.getSchema()));
    +      setupNewSchema();
    +      return true;
    +    } catch (SchemaChangeException ex) {
    +      logger.error("Failed to handle schema change hence killing the query");
    +      context.getExecutorState().fail(ex);
    +      left.kill(true); // Can have exchange receivers on left so called with true
    +      right.kill(false); // Won't have exchange receivers on right side
    +      return false;
    +    } finally {
    +      stats.stopSetup();
    +    }
    +  }
    +
    +  private boolean isTerminalOutcome(IterOutcome outcome) {
    +    return (outcome == STOP || outcome == OUT_OF_MEMORY || outcome == NONE);
    +  }
    +
    +  /**
    +   * Process left incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is
    +   * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator. Also
    +   * when we populate the outgoing container then this method is called to get next left batch if current one is
    +   * fully processed. It calls next() on left side until we get a non-empty RecordBatch. OR we get either of
    +   * OK_NEW_SCHEMA/EMIT/NONE/STOP/OOM/NOT_YET outcome.
    +   * @return IterOutcome after processing current left batch
    +   */
    +  private IterOutcome processLeftBatch() {
    +
    +    boolean needLeftBatch = leftJoinIndex == -1;
    +
    +    // If left batch is empty
    +    while (needLeftBatch) {
    +      leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, left) : leftUpstream;
    +      final boolean emptyLeftBatch = left.getRecordCount() <=0;
    +      logger.trace("Received a left batch and isEmpty: {}", emptyLeftBatch);
    +
    +      switch (leftUpstream) {
    +        case OK_NEW_SCHEMA:
    +          // This OK_NEW_SCHEMA is received post build schema phase and from left side
    +          // If schema didn't actually changed then just handle it as OK outcome. This is fine since it is not setting
    +          // up any incoming vector references in setupNewSchema. While copying the records it always work on latest
    +          // incoming vector.
    +          if (!isSchemaChanged(left.getSchema(), leftSchema)) {
    +            logger.warn(String.format("New schema received from left side is same as previous known left schema. " +
    +              "Ignoring this schema change. Old Left Schema: %s, New Left Schema: %s", leftSchema, left.getSchema()));
    +
    +            // Current left batch is empty and schema didn't changed as well, so let's get next batch and loose
    +            // OK_NEW_SCHEMA outcome
    +            processLeftBatchInFuture = false;
    +            if (emptyLeftBatch) {
    +              continue;
    +            } else {
    +              leftUpstream = OK;
    +            }
    +          } else if (outputIndex > 0) { // can only reach here from produceOutputBatch
    +            // This means there is already some records from previous join inside left batch
    +            // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call
    +            processLeftBatchInFuture = true;
    +            return OK_NEW_SCHEMA;
    +          }
    +
    +          // If left batch is empty with actual schema change then just rebuild the output container and send empty
    +          // batch downstream
    +          if (emptyLeftBatch) {
    +            if (handleSchemaChange()) {
    +              leftJoinIndex = -1;
    +              return OK_NEW_SCHEMA;
    +            } else {
    +              return STOP;
    +            }
    +          } // else - setup the new schema information after getting it from right side too.
    +        case OK:
    +          // With OK outcome we will keep calling next until we get a batch with >0 records
    +          if (emptyLeftBatch) {
    +            leftJoinIndex = -1;
    +            continue;
    +          } else {
    +            leftJoinIndex = 0;
    +          }
    +          break;
    +        case EMIT:
    +          // don't call next on right batch
    +          if (emptyLeftBatch) {
    +            leftJoinIndex = -1;
    +            return EMIT;
    +          } else {
    +            leftJoinIndex = 0;
    +          }
    +          break;
    +        case OUT_OF_MEMORY:
    +        case NONE:
    +        case STOP:
    +          // Not using =0 since if outgoing container is empty then no point returning anything
    +          if (outputIndex > 0) { // can only reach here from produceOutputBatch
    +            processLeftBatchInFuture = true;
    +          }
    +          return leftUpstream;
    +        case NOT_YET:
    +          try {
    +            Thread.sleep(5);
    +          } catch (InterruptedException ex) {
    +            logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it " +
    +              "received NOT_YET");
    +          }
    +          break;
    +      }
    +      needLeftBatch = leftJoinIndex == -1;
    +    }
    +    return leftUpstream;
    +  }
    +
    +  /**
    +   * Process right incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is
    +   * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator and if
    +   * left batch has some records in it. Also when we populate the outgoing container then this method is called to
    +   * get next right batch if current one is fully processed.
    +   * @return IterOutcome after processing current left batch
    +   */
    +  private IterOutcome processRightBatch() {
    +    // Check if we still have records left to process in left incoming from new batch or previously half processed
    +    // batch based on indexes. We are making sure to update leftJoinIndex and rightJoinIndex correctly. Like for new
    +    // batch leftJoinIndex will always be set to zero and once leftSide batch is fully processed then it will be set
    +    // to -1.
    +    // Whereas rightJoinIndex is to keep track of record in right batch being joined with record in left batch.
    +    // So when there are cases such that all records in right batch is not consumed by the output, then rightJoinIndex
    +    // will be a valid index. When all records are consumed it will be set to -1.
    +    boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex == -1);
    +    while (needNewRightBatch) {
    +      rightUpstream = next(RIGHT_INPUT, right);
    +      switch (rightUpstream) {
    +        case OK_NEW_SCHEMA:
    +          // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a
    +          // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through
    +          //
    +          // Right batch with OK_NEW_SCHEMA is always going to be an empty batch, so let's pass the new schema
    +          // downstream and later with subsequent next() call the join output will be produced
    +          Preconditions.checkState(right.getRecordCount() == 0,
    +            "Right side batch with OK_NEW_SCHEMA is not empty");
    +
    +          if (!isSchemaChanged(right.getSchema(), rightSchema)) {
    +            logger.warn(String.format("New schema received from right side is same as previous known right schema. " +
    +              "Ignoring this schema change. Old Right schema: %s, New Right Schema: %s",
    +              rightSchema, right.getSchema()));
    +            continue;
    +          }
    +          if (handleSchemaChange()) {
    +            container.setRecordCount(0);
    +            rightJoinIndex = -1;
    +            return OK_NEW_SCHEMA;
    +          } else {
    +            return STOP;
    +          }
    +        case OK:
    +        case EMIT:
    +          // Even if there are no records we should not call next() again because in case of LEFT join empty batch is
    +          // of importance too
    +          rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
    +          needNewRightBatch = false;
    +          break;
    +        case OUT_OF_MEMORY:
    +        case NONE:
    +        case STOP:
    +          needNewRightBatch = false;
    +          break;
    +        case NOT_YET:
    +          try {
    +            Thread.sleep(10);
    +          } catch (InterruptedException ex) {
    +            logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it " +
    +              "received NOT_YET");
    +          }
    +          break;
    +      }
    +    }
    +    return rightUpstream;
    +  }
    +
    +  /**
    +   * Get's the current left and right incoming batch and does the cross join to fill the output batch. If all the
    +   * records in the either or both the batches are consumed then it get's next batch from that branch depending upon
    +   * if output batch still has some space left. If output batch is full then the output if finalized to be sent
    +   * downstream. Subsequent call's knows how to consume previously half consumed (if any) batches and producing the
    +   * output using that.
    +   *
    +   * @return - IterOutcome to be send along with output batch to downstream operator
    +   */
    +  private IterOutcome produceOutputBatch() {
    +
    +    boolean isLeftProcessed = false;
    +
    +    // Try to fully pack the outgoing container
    +    while (!isOutgoingBatchFull()) {
    +      final int previousOutputCount = outputIndex;
    +      // invoke the runtime generated method to emit records in the output batch for each leftJoinIndex
    +      crossJoinAndOutputRecords();
    +
    +      // We have produced some records in outgoing container, hence there must be a match found for left record
    +      if (outputIndex > previousOutputCount) {
    +        // Need this extra flag since there can be left join case where for current leftJoinIndex it receives a right
    +        // batch with data, then an empty batch and again another empty batch with EMIT outcome. If we just use
    +        // outputIndex then we will loose the information that few rows for leftJoinIndex is already produced using
    +        // first right batch
    +        matchedRecordFound = true;
    +      }
    +
    +      // One right batch might span across multiple output batch. So rightIndex will be moving sum of all the
    +      // output records for this record batch until it's fully consumed.
    +      //
    +      // Also it can be so that one output batch can contain records from 2 different right batch hence the
    +      // rightJoinIndex should move by number of records in output batch for current right batch only.
    +      rightJoinIndex += outputIndex - previousOutputCount;
    +      final boolean isRightProcessed = rightJoinIndex == -1 || rightJoinIndex >= right.getRecordCount();
    +
    +      // Check if above join to produce output was based on empty right batch or
    +      // it resulted in right side batch to be fully consumed. In this scenario only if rightUpstream
    +      // is EMIT then increase the leftJoinIndex.
    +      // Otherwise it means for the given right batch there is still some record left to be processed.
    +      if (isRightProcessed) {
    +        if (rightUpstream == EMIT) {
    +          if (!matchedRecordFound && JoinRelType.LEFT == popConfig.getJoinType()) {
    +            // copy left side in case of LEFT join
    +            emitLeft(leftJoinIndex, outputIndex++);
    +          }
    +          ++leftJoinIndex;
    +          // Reset matchedRecord for next left index record
    +          matchedRecordFound = false;
    +        }
    +
    +        // Release vectors of right batch. This will happen for both rightUpstream = EMIT/OK
    +        VectorAccessibleUtilities.clear(right);
    +        rightJoinIndex = -1;
    +      }
    +
    +      // Check if previous left record was last one, then set leftJoinIndex to -1
    +      isLeftProcessed = leftJoinIndex >= left.getRecordCount();
    +      if (isLeftProcessed) {
    +        leftJoinIndex = -1;
    +        VectorAccessibleUtilities.clear(left);
    +      }
    +
    +      // Check if output batch still has some space
    +      if (!isOutgoingBatchFull()) {
    +        // Check if left side still has records or not
    +        if (isLeftProcessed) {
    +          // The current left batch was with EMIT/OK_NEW_SCHEMA outcome, then return output to downstream layer before
    +          // getting next batch
    +          if (leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) {
    +            break;
    +          } else {
    +            logger.debug("Output batch still has some space left, getting new batches from left and right");
    +            // Get both left batch and the right batch and make sure indexes are properly set
    +            leftUpstream = processLeftBatch();
    +
    +            if (processLeftBatchInFuture) {
    +              logger.debug("Received left batch with outcome {} such that we have to return the current outgoing " +
    +                "batch and process the new batch in subsequent next call", leftUpstream);
    +              // We should return the current output batch with OK outcome and don't reset the leftUpstream
    +              finalizeOutputContainer();
    +              return OK;
    +            }
    +
    +            // If left batch received a terminal outcome then don't call right batch
    +            if (isTerminalOutcome(leftUpstream)) {
    +              finalizeOutputContainer();
    +              return leftUpstream;
    +            }
    +
    +            // If we have received the left batch with EMIT outcome and is empty then we should return previous output
    +            // batch with EMIT outcome
    +            if (leftUpstream == EMIT && left.getRecordCount() == 0) {
    +              isLeftProcessed = true;
    +              break;
    +            }
    +
    +            // Update the batch memory manager to use new left incoming batch
    +            updateMemoryManager(LEFT_INPUT);
    +          }
    +        }
    +
    +        // If we are here it means one of the below:
    +        // 1) Either previous left batch was not fully processed and it came with OK outcome. There is still some space
    +        // left in outgoing batch so let's get next right batch.
    +        // 2) OR previous left & right batch was fully processed and it came with OK outcome. There is space in outgoing
    +        // batch. Now we have got new left batch with OK outcome. Let's get next right batch
    +        //
    +        // It will not hit OK_NEW_SCHEMA since left side have not seen that outcome
    +        rightUpstream = processRightBatch();
    +        Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected schema change in right branch");
    +
    +        if (isTerminalOutcome(rightUpstream)) {
    +          finalizeOutputContainer();
    +          return rightUpstream;
    +        }
    +
    +        // Update the batch memory manager to use new right incoming batch
    +        updateMemoryManager(RIGHT_INPUT);
    +      }
    +    } // output batch is full to its max capacity
    +
    +    finalizeOutputContainer();
    +
    +    // Check if output batch was full and left was fully consumed or not. Since if left is not consumed entirely
    +    // but output batch is full, then if the left batch came with EMIT outcome we should send this output batch along
    +    // with OK outcome not with EMIT. Whereas if output is full and left is also fully consumed then we should send
    +    // EMIT outcome.
    +    if (leftUpstream == EMIT && isLeftProcessed) {
    +      logger.debug("Sending current output batch with EMIT outcome since left is received with EMIT and is fully " +
    +        "consumed in output batch");
    +      return EMIT;
    +    }
    +
    +    if (leftUpstream == OK_NEW_SCHEMA) {
    +      // return output batch with OK_NEW_SCHEMA and reset the state to OK
    +      logger.debug("Sending current output batch with OK_NEW_SCHEMA and resetting the left outcome to OK for next set" +
    +        " of batches");
    +      leftUpstream = OK;
    +      return OK_NEW_SCHEMA;
    +    }
    +    return OK;
    +  }
    +
    +  /**
    +   * Finalizes the current output container with the records produced so far before sending it downstream
    +   */
    +  private void finalizeOutputContainer() {
    +    VectorAccessibleUtilities.setValueCount(container, outputIndex);
    +
    +    // Set the record count in the container
    +    container.setRecordCount(outputIndex);
    +    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
    +
    +    batchMemoryManager.updateOutgoingStats(outputIndex);
    +    logger.debug("Number of records emitted: " + outputIndex);
    +
    +    // Update the output index for next output batch to zero
    +    outputIndex = 0;
    +  }
    +
    +  /**
    +   * Check if the schema changed between provided newSchema and oldSchema. It relies on
    +   * {@link BatchSchema#isEquivalent(BatchSchema)}.
    +   * @param newSchema - New Schema information
    +   * @param oldSchema -  - New Schema information to compare with
    +   *
    +   * @return - true - if newSchema is not same as oldSchema
    +   *         - false - if newSchema is same as oldSchema
    +   */
    +  private boolean isSchemaChanged(BatchSchema newSchema, BatchSchema oldSchema) {
    +    return (newSchema == null || oldSchema == null) || !newSchema.isEquivalent(oldSchema);
    +  }
    +
    +  /**
    +   * Validate if the input schema is not null and doesn't contain any Selection Vector.
    +   * @param schema - input schema to verify
    +   * @return - true: valid input schema
    +   *           false: invalid input schema
    +   */
    +  private boolean verifyInputSchema(BatchSchema schema) {
    +
    +    boolean isValid = true;
    +    if (schema == null) {
    +      logger.error("Null schema found for the incoming batch");
    +      isValid = false;
    +    } else {
    +      final BatchSchema.SelectionVectorMode svMode = schema.getSelectionVectorMode();
    +      if (svMode != BatchSchema.SelectionVectorMode.NONE) {
    +        logger.error("Incoming batch schema found with selection vector which is not supported. SVMode: {}",
    +          svMode.toString());
    +        isValid = false;
    +      }
    +    }
    +    return isValid;
    +  }
    +
    +  /**
    +   * Helps to create the outgoing container vectors based on known left and right batch schemas
    +   * @throws SchemaChangeException
    +   */
    +  private void setupNewSchema() throws SchemaChangeException {
    +
    +    logger.debug(String.format("Setting up new schema based on incoming batch. New left schema: %s" +
    +        " and New right schema: %s", left.getSchema(), right.getSchema()));
    +
    +    // Clear up the container
    +    container.clear();
    +    leftSchema = left.getSchema();
    +    rightSchema = right.getSchema();
    +
    +    if (!verifyInputSchema(leftSchema)) {
    +      throw new SchemaChangeException("Invalid Schema found for left incoming batch");
    +    }
    +
    +    if (!verifyInputSchema(rightSchema)) {
    +      throw new SchemaChangeException("Invalid Schema found for right incoming batch");
    +    }
    +
    --- End diff --
    
    In case of other joins planner takes care of renaming columns which has same name from left and right side of join. I will expect same thing to happen in Lateral Join as well rather than explicit check here.


---

[GitHub] drill pull request #1212: DRILL-6323: Lateral Join - Initial implementation

Posted by sohami <gi...@git.apache.org>.
Github user sohami commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1212#discussion_r182262104
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---
    @@ -186,6 +186,11 @@ private ExecConstants() {
       public static final String BIT_ENCRYPTION_SASL_ENABLED = "drill.exec.security.bit.encryption.sasl.enabled";
       public static final String BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE = "drill.exec.security.bit.encryption.sasl.max_wrapped_size";
     
    +  /**
    --- End diff --
    
    Removed. Thanks for catching this.


---

[GitHub] drill pull request #1212: DRILL-6323: Lateral Join - Initial implementation

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1212#discussion_r182172836
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java ---
    @@ -0,0 +1,861 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.join;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import org.apache.calcite.rel.core.JoinRelType;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.common.types.Types;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.physical.base.LateralContract;
    +import org.apache.drill.exec.physical.config.LateralJoinPOP;
    +import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.JoinBatchMemoryManager;
    +import org.apache.drill.exec.record.MaterializedField;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.RecordBatchSizer;
    +import org.apache.drill.exec.record.VectorAccessibleUtilities;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.vector.ValueVector;
    +
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OUT_OF_MEMORY;
    +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
    +
    +/**
    + * RecordBatch implementation for the lateral join operator. Currently it's expected LATERAL to co-exists with UNNEST
    + * operator. Both LATERAL and UNNEST will share a contract with each other defined at {@link LateralContract}
    + */
    +public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class);
    +
    +  // Input indexes to correctly update the stats
    +  private static final int LEFT_INPUT = 0;
    +
    +  private static final int RIGHT_INPUT = 1;
    +
    +  // Maximum number records in the outgoing batch
    +  private int maxOutputRowCount;
    +
    +  // Schema on the left side
    +  private BatchSchema leftSchema;
    +
    +  // Schema on the right side
    +  private BatchSchema rightSchema;
    +
    +  // Index in output batch to populate next row
    +  private int outputIndex;
    +
    +  // Current index of record in left incoming which is being processed
    +  private int leftJoinIndex = -1;
    +
    +  // Current index of record in right incoming which is being processed
    +  private int rightJoinIndex = -1;
    +
    +  // flag to keep track if current left batch needs to be processed in future next call
    +  private boolean processLeftBatchInFuture;
    +
    +  // Keep track if any matching right record was found for current left index record
    +  private boolean matchedRecordFound;
    +
    +  private boolean useMemoryManager = true;
    +
    +  /* ****************************************************************************************************************
    +   * Public Methods
    +   * ****************************************************************************************************************/
    +  public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
    +                          RecordBatch left, RecordBatch right) throws OutOfMemoryException {
    +    super(popConfig, context, left, right);
    +    Preconditions.checkNotNull(left);
    +    Preconditions.checkNotNull(right);
    +    final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
    +    batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right);
    +
    +    // Initially it's set to default value of 64K and later for each new output row it will be set to the computed
    +    // row count
    +    maxOutputRowCount = batchMemoryManager.getOutputRowCount();
    +  }
    +
    +  /**
    +   * Method that get's left and right incoming batch and produce the output batch. If the left incoming batch is
    +   * empty then next on right branch is not called and empty batch with correct outcome is returned. If non empty
    +   * left incoming batch is received then it call's next on right branch to get an incoming and finally produces
    +   * output.
    +   * @return IterOutcome state of the lateral join batch
    +   */
    +  @Override
    +  public IterOutcome innerNext() {
    +
    +    // We don't do anything special on FIRST state. Process left batch first and then right batch if need be
    +    IterOutcome childOutcome = processLeftBatch();
    +
    +    // reset this state after calling processLeftBatch above.
    +    processLeftBatchInFuture = false;
    +
    +    // If the left batch doesn't have any record in the incoming batch (with OK_NEW_SCHEMA/EMIT) or the state returned
    +    // from left side is terminal state then just return the IterOutcome and don't call next() on right branch
    +    if (isTerminalOutcome(childOutcome) || left.getRecordCount() == 0) {
    +      container.setRecordCount(0);
    +      return childOutcome;
    +    }
    +
    +    // Left side has some records in the batch so let's process right batch
    +    childOutcome = processRightBatch();
    +
    +    // reset the left & right outcomes to OK here and send the empty batch downstream
    +    // Assumption being right side will always send OK_NEW_SCHEMA with empty batch which is what UNNEST will do
    +    if (childOutcome == OK_NEW_SCHEMA) {
    +      leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
    +      rightUpstream = OK;
    +      return childOutcome;
    +    }
    +
    +    if (isTerminalOutcome(childOutcome)) {
    +      return childOutcome;
    +    }
    +
    +    // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in
    +    // output container based on new left schema and old right schema. If schema change failed then return STOP
    +    // downstream
    +    if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) {
    +      return STOP;
    +    }
    +
    +    // Setup the references of left, right and outgoing container in generated operator
    +    state = BatchState.NOT_FIRST;
    +
    +    // Update the memory manager
    +    updateMemoryManager(LEFT_INPUT);
    +    updateMemoryManager(RIGHT_INPUT);
    +
    +    // allocate space for the outgoing batch
    +    allocateVectors();
    +
    +    return produceOutputBatch();
    +  }
    +
    +  @Override
    +  public void close() {
    +    updateBatchMemoryManagerStats();
    +    super.close();
    +  }
    +
    +  @Override
    +  public int getRecordCount() {
    +    return container.getRecordCount();
    +  }
    +
    +  /**
    +   * Returns the left side incoming for the Lateral Join. Used by right branch leaf operator of Lateral
    +   * to process the records at leftJoinIndex.
    +   *
    +   * @return - RecordBatch received as left side incoming
    +   */
    +  @Override
    +  public RecordBatch getIncoming() {
    +    Preconditions.checkState (left != null, "Retuning null left batch. It's unexpected since right side will only be " +
    +      "called iff there is any valid left batch");
    +    return left;
    +  }
    +
    +  /**
    +   * Returns the current row index which the calling operator should process in current incoming left record batch.
    +   * LATERAL should never return it as -1 since that indicated current left batch is empty and LATERAL will never
    +   * call next on right side with empty left batch
    +   *
    +   * @return - int - index of row to process.
    +   */
    +  @Override
    +  public int getRecordIndex() {
    +    Preconditions.checkState (leftJoinIndex < left.getRecordCount(),
    +      String.format("Left join index: %d is out of bounds: %d", leftJoinIndex, left.getRecordCount()));
    +    return leftJoinIndex;
    +  }
    +
    +  /**
    +   * Returns the current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming batch
    +   */
    +  @Override
    +  public IterOutcome getLeftOutcome() {
    +    return leftUpstream;
    +  }
    +
    +  /* ****************************************************************************************************************
    +   * Protected Methods
    +   * ****************************************************************************************************************/
    +
    +  /**
    +   * Method to get left and right batch during build schema phase for {@link LateralJoinBatch}. If left batch sees a
    +   * failure outcome then we don't even call next on right branch, since there is no left incoming.
    +   * @return true if both the left/right batch was received without failure outcome.
    +   *         false if either of batch is received with failure outcome.
    +   */
    +  @Override
    +  protected boolean prefetchFirstBatchFromBothSides() {
    +    // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch
    +    leftUpstream = next(0, left);
    +
    +    boolean validBatch = setBatchState(leftUpstream);
    +
    +    if (validBatch) {
    +      rightUpstream = next(1, right);
    +      validBatch = setBatchState(rightUpstream);
    +    }
    +
    +    // EMIT outcome is not expected as part of first batch from either side
    +    if (leftUpstream == EMIT || rightUpstream == EMIT) {
    +      state = BatchState.STOP;
    +      throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " +
    +        "buildSchema phase");
    +    }
    +    return validBatch;
    +  }
    +
    +  /**
    +   * Prefetch a batch from left and right branch to know about the schema of each side. Then adds value vector in
    +   * output container based on those schemas. For this phase LATERAL always expect's an empty batch from right side
    +   * which UNNEST should abide by.
    +   *
    +   * @throws SchemaChangeException if batch schema was changed during execution
    +   */
    +  @Override
    +  protected void buildSchema() throws SchemaChangeException {
    +    // Prefetch a RecordBatch from both left and right branch
    +    if (!prefetchFirstBatchFromBothSides()) {
    +      return;
    +    }
    +    Preconditions.checkState(right.getRecordCount() == 0, "Unexpected non-empty first right batch received");
    +
    +    // Update the record memory manager
    +    updateMemoryManager(LEFT_INPUT);
    +    updateMemoryManager(RIGHT_INPUT);
    +
    +    // Setup output container schema based on known left and right schema
    +    setupNewSchema();
    +
    +    // Release the vectors received from right side
    +    VectorAccessibleUtilities.clear(right);
    +
    +    // Set join index as invalid (-1) if the left side is empty, else set it to 0
    +    leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0;
    +    rightJoinIndex = -1;
    +
    +    // Reset the left side of the IterOutcome since for this call, OK_NEW_SCHEMA will be returned correctly
    +    // by buildSchema caller and we should treat the batch as received with OK outcome.
    +    leftUpstream = OK;
    +    rightUpstream = OK;
    +  }
    +
    +  @Override
    +  protected void killIncoming(boolean sendUpstream) {
    +    this.left.kill(sendUpstream);
    +    // Reset the left side outcome as STOP since as part of right kill when UNNEST will ask IterOutcome of left incoming
    +    // from LATERAL and based on that it can make decision if the kill is coming from downstream to LATERAL or upstream
    +    // to LATERAL. Like LIMIT operator being present downstream to LATERAL or upstream to LATERAL and downstream to
    +    // UNNEST.
    +    leftUpstream = STOP;
    +    this.right.kill(sendUpstream);
    +  }
    +
    +  /* ****************************************************************************************************************
    +   * Private Methods
    +   * ****************************************************************************************************************/
    +
    +  private boolean handleSchemaChange() {
    +    try {
    +      stats.startSetup();
    +      logger.debug(String.format("Setting up new schema based on incoming batch. Old output schema: %s",
    +        container.getSchema()));
    +      setupNewSchema();
    +      return true;
    +    } catch (SchemaChangeException ex) {
    +      logger.error("Failed to handle schema change hence killing the query");
    +      context.getExecutorState().fail(ex);
    +      left.kill(true); // Can have exchange receivers on left so called with true
    +      right.kill(false); // Won't have exchange receivers on right side
    +      return false;
    +    } finally {
    +      stats.stopSetup();
    +    }
    +  }
    +
    +  private boolean isTerminalOutcome(IterOutcome outcome) {
    +    return (outcome == STOP || outcome == OUT_OF_MEMORY || outcome == NONE);
    +  }
    +
    +  /**
    +   * Process left incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is
    +   * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator. Also
    +   * when we populate the outgoing container then this method is called to get next left batch if current one is
    +   * fully processed. It calls next() on left side until we get a non-empty RecordBatch. OR we get either of
    +   * OK_NEW_SCHEMA/EMIT/NONE/STOP/OOM/NOT_YET outcome.
    +   * @return IterOutcome after processing current left batch
    +   */
    +  private IterOutcome processLeftBatch() {
    +
    +    boolean needLeftBatch = leftJoinIndex == -1;
    +
    +    // If left batch is empty
    +    while (needLeftBatch) {
    +      leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, left) : leftUpstream;
    +      final boolean emptyLeftBatch = left.getRecordCount() <=0;
    +      logger.trace("Received a left batch and isEmpty: {}", emptyLeftBatch);
    +
    +      switch (leftUpstream) {
    +        case OK_NEW_SCHEMA:
    +          // This OK_NEW_SCHEMA is received post build schema phase and from left side
    +          // If schema didn't actually changed then just handle it as OK outcome. This is fine since it is not setting
    +          // up any incoming vector references in setupNewSchema. While copying the records it always work on latest
    +          // incoming vector.
    +          if (!isSchemaChanged(left.getSchema(), leftSchema)) {
    +            logger.warn(String.format("New schema received from left side is same as previous known left schema. " +
    +              "Ignoring this schema change. Old Left Schema: %s, New Left Schema: %s", leftSchema, left.getSchema()));
    +
    +            // Current left batch is empty and schema didn't changed as well, so let's get next batch and loose
    +            // OK_NEW_SCHEMA outcome
    +            processLeftBatchInFuture = false;
    +            if (emptyLeftBatch) {
    +              continue;
    +            } else {
    +              leftUpstream = OK;
    +            }
    +          } else if (outputIndex > 0) { // can only reach here from produceOutputBatch
    +            // This means there is already some records from previous join inside left batch
    +            // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call
    +            processLeftBatchInFuture = true;
    +            return OK_NEW_SCHEMA;
    +          }
    +
    +          // If left batch is empty with actual schema change then just rebuild the output container and send empty
    +          // batch downstream
    +          if (emptyLeftBatch) {
    +            if (handleSchemaChange()) {
    +              leftJoinIndex = -1;
    +              return OK_NEW_SCHEMA;
    +            } else {
    +              return STOP;
    +            }
    +          } // else - setup the new schema information after getting it from right side too.
    +        case OK:
    +          // With OK outcome we will keep calling next until we get a batch with >0 records
    +          if (emptyLeftBatch) {
    +            leftJoinIndex = -1;
    +            continue;
    +          } else {
    +            leftJoinIndex = 0;
    +          }
    +          break;
    +        case EMIT:
    +          // don't call next on right batch
    +          if (emptyLeftBatch) {
    +            leftJoinIndex = -1;
    +            return EMIT;
    +          } else {
    +            leftJoinIndex = 0;
    +          }
    +          break;
    +        case OUT_OF_MEMORY:
    +        case NONE:
    +        case STOP:
    +          // Not using =0 since if outgoing container is empty then no point returning anything
    +          if (outputIndex > 0) { // can only reach here from produceOutputBatch
    +            processLeftBatchInFuture = true;
    +          }
    +          return leftUpstream;
    +        case NOT_YET:
    +          try {
    +            Thread.sleep(5);
    +          } catch (InterruptedException ex) {
    +            logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it " +
    +              "received NOT_YET");
    +          }
    +          break;
    +      }
    +      needLeftBatch = leftJoinIndex == -1;
    +    }
    +    return leftUpstream;
    +  }
    +
    +  /**
    +   * Process right incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is
    +   * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator and if
    +   * left batch has some records in it. Also when we populate the outgoing container then this method is called to
    +   * get next right batch if current one is fully processed.
    +   * @return IterOutcome after processing current left batch
    +   */
    +  private IterOutcome processRightBatch() {
    +    // Check if we still have records left to process in left incoming from new batch or previously half processed
    +    // batch based on indexes. We are making sure to update leftJoinIndex and rightJoinIndex correctly. Like for new
    +    // batch leftJoinIndex will always be set to zero and once leftSide batch is fully processed then it will be set
    +    // to -1.
    +    // Whereas rightJoinIndex is to keep track of record in right batch being joined with record in left batch.
    +    // So when there are cases such that all records in right batch is not consumed by the output, then rightJoinIndex
    +    // will be a valid index. When all records are consumed it will be set to -1.
    +    boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex == -1);
    +    while (needNewRightBatch) {
    +      rightUpstream = next(RIGHT_INPUT, right);
    +      switch (rightUpstream) {
    +        case OK_NEW_SCHEMA:
    +          // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a
    +          // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through
    +          //
    +          // Right batch with OK_NEW_SCHEMA is always going to be an empty batch, so let's pass the new schema
    +          // downstream and later with subsequent next() call the join output will be produced
    +          Preconditions.checkState(right.getRecordCount() == 0,
    +            "Right side batch with OK_NEW_SCHEMA is not empty");
    +
    +          if (!isSchemaChanged(right.getSchema(), rightSchema)) {
    +            logger.warn(String.format("New schema received from right side is same as previous known right schema. " +
    +              "Ignoring this schema change. Old Right schema: %s, New Right Schema: %s",
    +              rightSchema, right.getSchema()));
    +            continue;
    +          }
    +          if (handleSchemaChange()) {
    +            container.setRecordCount(0);
    +            rightJoinIndex = -1;
    +            return OK_NEW_SCHEMA;
    +          } else {
    +            return STOP;
    +          }
    +        case OK:
    +        case EMIT:
    +          // Even if there are no records we should not call next() again because in case of LEFT join empty batch is
    +          // of importance too
    +          rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
    +          needNewRightBatch = false;
    +          break;
    +        case OUT_OF_MEMORY:
    +        case NONE:
    +        case STOP:
    +          needNewRightBatch = false;
    +          break;
    +        case NOT_YET:
    +          try {
    +            Thread.sleep(10);
    +          } catch (InterruptedException ex) {
    +            logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it " +
    +              "received NOT_YET");
    +          }
    +          break;
    +      }
    +    }
    +    return rightUpstream;
    +  }
    +
    +  /**
    +   * Get's the current left and right incoming batch and does the cross join to fill the output batch. If all the
    +   * records in the either or both the batches are consumed then it get's next batch from that branch depending upon
    +   * if output batch still has some space left. If output batch is full then the output if finalized to be sent
    +   * downstream. Subsequent call's knows how to consume previously half consumed (if any) batches and producing the
    +   * output using that.
    +   *
    +   * @return - IterOutcome to be send along with output batch to downstream operator
    +   */
    +  private IterOutcome produceOutputBatch() {
    +
    +    boolean isLeftProcessed = false;
    +
    +    // Try to fully pack the outgoing container
    +    while (!isOutgoingBatchFull()) {
    +      final int previousOutputCount = outputIndex;
    +      // invoke the runtime generated method to emit records in the output batch for each leftJoinIndex
    +      crossJoinAndOutputRecords();
    +
    +      // We have produced some records in outgoing container, hence there must be a match found for left record
    +      if (outputIndex > previousOutputCount) {
    +        // Need this extra flag since there can be left join case where for current leftJoinIndex it receives a right
    +        // batch with data, then an empty batch and again another empty batch with EMIT outcome. If we just use
    +        // outputIndex then we will loose the information that few rows for leftJoinIndex is already produced using
    +        // first right batch
    +        matchedRecordFound = true;
    +      }
    +
    +      // One right batch might span across multiple output batch. So rightIndex will be moving sum of all the
    +      // output records for this record batch until it's fully consumed.
    +      //
    +      // Also it can be so that one output batch can contain records from 2 different right batch hence the
    +      // rightJoinIndex should move by number of records in output batch for current right batch only.
    +      rightJoinIndex += outputIndex - previousOutputCount;
    +      final boolean isRightProcessed = rightJoinIndex == -1 || rightJoinIndex >= right.getRecordCount();
    +
    +      // Check if above join to produce output was based on empty right batch or
    +      // it resulted in right side batch to be fully consumed. In this scenario only if rightUpstream
    +      // is EMIT then increase the leftJoinIndex.
    +      // Otherwise it means for the given right batch there is still some record left to be processed.
    +      if (isRightProcessed) {
    +        if (rightUpstream == EMIT) {
    +          if (!matchedRecordFound && JoinRelType.LEFT == popConfig.getJoinType()) {
    +            // copy left side in case of LEFT join
    +            emitLeft(leftJoinIndex, outputIndex++);
    +          }
    +          ++leftJoinIndex;
    +          // Reset matchedRecord for next left index record
    +          matchedRecordFound = false;
    +        }
    +
    +        // Release vectors of right batch. This will happen for both rightUpstream = EMIT/OK
    +        VectorAccessibleUtilities.clear(right);
    +        rightJoinIndex = -1;
    +      }
    +
    +      // Check if previous left record was last one, then set leftJoinIndex to -1
    +      isLeftProcessed = leftJoinIndex >= left.getRecordCount();
    +      if (isLeftProcessed) {
    +        leftJoinIndex = -1;
    +        VectorAccessibleUtilities.clear(left);
    +      }
    +
    +      // Check if output batch still has some space
    +      if (!isOutgoingBatchFull()) {
    +        // Check if left side still has records or not
    +        if (isLeftProcessed) {
    +          // The current left batch was with EMIT/OK_NEW_SCHEMA outcome, then return output to downstream layer before
    +          // getting next batch
    +          if (leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) {
    +            break;
    +          } else {
    +            logger.debug("Output batch still has some space left, getting new batches from left and right");
    +            // Get both left batch and the right batch and make sure indexes are properly set
    +            leftUpstream = processLeftBatch();
    +
    +            if (processLeftBatchInFuture) {
    +              logger.debug("Received left batch with outcome {} such that we have to return the current outgoing " +
    +                "batch and process the new batch in subsequent next call", leftUpstream);
    +              // We should return the current output batch with OK outcome and don't reset the leftUpstream
    +              finalizeOutputContainer();
    +              return OK;
    +            }
    +
    +            // If left batch received a terminal outcome then don't call right batch
    +            if (isTerminalOutcome(leftUpstream)) {
    +              finalizeOutputContainer();
    +              return leftUpstream;
    +            }
    +
    +            // If we have received the left batch with EMIT outcome and is empty then we should return previous output
    +            // batch with EMIT outcome
    +            if (leftUpstream == EMIT && left.getRecordCount() == 0) {
    +              isLeftProcessed = true;
    +              break;
    +            }
    +
    +            // Update the batch memory manager to use new left incoming batch
    +            updateMemoryManager(LEFT_INPUT);
    +          }
    +        }
    +
    +        // If we are here it means one of the below:
    +        // 1) Either previous left batch was not fully processed and it came with OK outcome. There is still some space
    +        // left in outgoing batch so let's get next right batch.
    +        // 2) OR previous left & right batch was fully processed and it came with OK outcome. There is space in outgoing
    +        // batch. Now we have got new left batch with OK outcome. Let's get next right batch
    +        //
    +        // It will not hit OK_NEW_SCHEMA since left side have not seen that outcome
    +        rightUpstream = processRightBatch();
    +        Preconditions.checkState(rightUpstream != OK_NEW_SCHEMA, "Unexpected schema change in right branch");
    +
    +        if (isTerminalOutcome(rightUpstream)) {
    +          finalizeOutputContainer();
    +          return rightUpstream;
    +        }
    +
    +        // Update the batch memory manager to use new right incoming batch
    +        updateMemoryManager(RIGHT_INPUT);
    +      }
    +    } // output batch is full to its max capacity
    +
    +    finalizeOutputContainer();
    +
    +    // Check if output batch was full and left was fully consumed or not. Since if left is not consumed entirely
    +    // but output batch is full, then if the left batch came with EMIT outcome we should send this output batch along
    +    // with OK outcome not with EMIT. Whereas if output is full and left is also fully consumed then we should send
    +    // EMIT outcome.
    +    if (leftUpstream == EMIT && isLeftProcessed) {
    +      logger.debug("Sending current output batch with EMIT outcome since left is received with EMIT and is fully " +
    +        "consumed in output batch");
    +      return EMIT;
    +    }
    +
    +    if (leftUpstream == OK_NEW_SCHEMA) {
    +      // return output batch with OK_NEW_SCHEMA and reset the state to OK
    +      logger.debug("Sending current output batch with OK_NEW_SCHEMA and resetting the left outcome to OK for next set" +
    +        " of batches");
    +      leftUpstream = OK;
    +      return OK_NEW_SCHEMA;
    +    }
    +    return OK;
    +  }
    +
    +  /**
    +   * Finalizes the current output container with the records produced so far before sending it downstream
    +   */
    +  private void finalizeOutputContainer() {
    +    VectorAccessibleUtilities.setValueCount(container, outputIndex);
    +
    +    // Set the record count in the container
    +    container.setRecordCount(outputIndex);
    +    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
    +
    +    batchMemoryManager.updateOutgoingStats(outputIndex);
    +    logger.debug("Number of records emitted: " + outputIndex);
    +
    +    // Update the output index for next output batch to zero
    +    outputIndex = 0;
    +  }
    +
    +  /**
    +   * Check if the schema changed between provided newSchema and oldSchema. It relies on
    +   * {@link BatchSchema#isEquivalent(BatchSchema)}.
    +   * @param newSchema - New Schema information
    +   * @param oldSchema -  - New Schema information to compare with
    +   *
    +   * @return - true - if newSchema is not same as oldSchema
    +   *         - false - if newSchema is same as oldSchema
    +   */
    +  private boolean isSchemaChanged(BatchSchema newSchema, BatchSchema oldSchema) {
    +    return (newSchema == null || oldSchema == null) || !newSchema.isEquivalent(oldSchema);
    +  }
    +
    +  /**
    +   * Validate if the input schema is not null and doesn't contain any Selection Vector.
    +   * @param schema - input schema to verify
    +   * @return - true: valid input schema
    +   *           false: invalid input schema
    +   */
    +  private boolean verifyInputSchema(BatchSchema schema) {
    +
    +    boolean isValid = true;
    +    if (schema == null) {
    +      logger.error("Null schema found for the incoming batch");
    +      isValid = false;
    +    } else {
    +      final BatchSchema.SelectionVectorMode svMode = schema.getSelectionVectorMode();
    +      if (svMode != BatchSchema.SelectionVectorMode.NONE) {
    +        logger.error("Incoming batch schema found with selection vector which is not supported. SVMode: {}",
    +          svMode.toString());
    +        isValid = false;
    +      }
    +    }
    +    return isValid;
    +  }
    +
    +  /**
    +   * Helps to create the outgoing container vectors based on known left and right batch schemas
    +   * @throws SchemaChangeException
    +   */
    +  private void setupNewSchema() throws SchemaChangeException {
    +
    +    logger.debug(String.format("Setting up new schema based on incoming batch. New left schema: %s" +
    +        " and New right schema: %s", left.getSchema(), right.getSchema()));
    +
    +    // Clear up the container
    +    container.clear();
    +    leftSchema = left.getSchema();
    +    rightSchema = right.getSchema();
    +
    +    if (!verifyInputSchema(leftSchema)) {
    +      throw new SchemaChangeException("Invalid Schema found for left incoming batch");
    +    }
    +
    +    if (!verifyInputSchema(rightSchema)) {
    +      throw new SchemaChangeException("Invalid Schema found for right incoming batch");
    +    }
    +
    --- End diff --
    
    Should we add a check here to guard that there is no duplication of names between right and left inputs?


---

[GitHub] drill pull request #1212: DRILL-6323: Lateral Join - Initial implementation

Posted by parthchandra <gi...@git.apache.org>.
Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1212#discussion_r182172309
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---
    @@ -186,6 +186,11 @@ private ExecConstants() {
       public static final String BIT_ENCRYPTION_SASL_ENABLED = "drill.exec.security.bit.encryption.sasl.enabled";
       public static final String BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE = "drill.exec.security.bit.encryption.sasl.max_wrapped_size";
     
    +  /**
    --- End diff --
    
    Not needed as you don't have code gen


---

[GitHub] drill pull request #1212: DRILL-6323: Lateral Join - Initial implementation

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/drill/pull/1212


---