You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by mehant <gi...@git.apache.org> on 2014/04/13 11:02:47 UTC

[GitHub] incubator-drill pull request: DRILL-505 Implement hash join operat...

GitHub user mehant opened a pull request:

    https://github.com/apache/incubator-drill/pull/49

    DRILL-505 Implement hash join operator

    Support for left outer, right outer and full joins
    Tests and multi batch outer join WIP
    
    Support for multiple join conditions
    Add following tests
     - Multiple condition join
     - Join on JSON scan
     - Multi batch join
     - Simple equality join
    
    Fix memory leaks
    
    Cleanup

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mehant/incubator-drill DRILL-505_hash_join

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-drill/pull/49.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #49
    
----
commit 1969f911a6a8d440e3b4eb7c580c2d53cf0d06c7
Author: Mehant Baid <me...@gmail.com>
Date:   2014-02-25T09:09:06Z

    DRILL-505 Hash Join basic changes
    
    Support for left outer, right outer and full joins
    Tests and multi batch outer join WIP
    
    Support for multiple join conditions
    Add following tests
     - Multiple condition join
     - Join on JSON scan
     - Multi batch join
     - Simple equality join
    
    Fix memory leaks
    
    Cleanup

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-drill pull request: DRILL-505 Implement hash join operat...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/incubator-drill/pull/49#discussion_r11693506
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java ---
    @@ -0,0 +1,220 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.drill.exec.physical.impl.join;
    +
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +import org.apache.drill.exec.physical.impl.common.HashTable;
    +
    +
    +/*
    + * Helper class for hash join. Keeps track of information about the build side batches.
    + *
    + * Hash join is a blocking operator, so we consume all the batches on the build side and
    + * store them in a hyper container. The way we can retrieve records from the hyper container
    + * is by providing the record index and batch index in the hyper container. When we invoke put()
    + * for a given row, hash table returns a global index. We store the current row's record index
    + * and batch index in this global index of the startIndices structure.
    + *
    + * Since there can be many rows with the same key on the build side, we store the first
    + * index in the startIndices list and the remaining are stored as a logical linked list using
    + * the 'links' field in the BuildInfo structures.
    + *
    + * Apart from the indexes into the hyper container, this class also stores information about
    + * which records of the build side had a matching record on the probe side. Stored in a bitvector
    + * keyMatchBitVector, it is used to retrieve all records that did not match a record on probe side
    + * for right outer and full outer joins
    + */
    +public class HashJoinHelper {
    +
    +    /* List of start indexes. Stores the record and batch index of the first record
    +     * with a give key.
    +     */
    +    public List<SelectionVector4> startIndices = new ArrayList<>();
    +
    +    // List of BuildInfo structures. Used to maintain auxiliary information about the build batches
    +    public List<BuildInfo> buildInfoList = new ArrayList<>();
    +
    +    // Fragment context
    +    public FragmentContext context;
    +
    +    // Constant to indicate index is empty.
    +    public static final int INDEX_EMPTY = -1;
    +
    +    public HashJoinHelper(FragmentContext context) {
    +        this.context = context;
    +    }
    +
    +    public void addStartIndexBatch() throws SchemaChangeException {
    +        startIndices.add(getNewSV4(HashTable.BATCH_SIZE));
    +    }
    +
    +    public class BuildInfo {
    +        // List of links. Logically it helps maintain a linked list of records with the same key value
    +        private SelectionVector4 links;
    +
    +        // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side
    +        private BitSet keyMatchBitVector;
    +
    +        // number of records in this batch
    +        private int recordCount;
    +
    +        public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) {
    +            this.links = links;
    +            this.keyMatchBitVector = keyMatchBitVector;
    +            this.recordCount = recordCount;
    +        }
    +
    +        public SelectionVector4 getLinks() {
    +            return links;
    +        }
    +
    +        public BitSet getKeyMatchBitVector() {
    +            return keyMatchBitVector;
    +        }
    +    }
    +
    +    public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
    +
    +        ByteBuf vector = context.getAllocator().buffer((recordCount * 4));
    +
    +        SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
    +
    +        // Initialize the vector
    +        for (int i = 0; i < recordCount; i++) {
    +            sv4.set(i, INDEX_EMPTY);
    +        }
    +
    +        return sv4;
    +    }
    +
    +    public void addNewBatch(int recordCount) throws SchemaChangeException {
    +        // Add a node to the list of BuildInfo's
    +        BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount);
    +        buildInfoList.add(info);
    +    }
    +
    +    public int getStartIndex(int keyIndex) {
    +        int batchIdx  = keyIndex / HashTable.BATCH_SIZE;
    +        int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
    +
    +        assert batchIdx < startIndices.size();
    +
    +        SelectionVector4 sv4 = startIndices.get(batchIdx);
    +
    +        return sv4.get(offsetIdx);
    +    }
    +
    +    public int getNextIndex(int currentIdx) {
    +        // Get to the links field of the current index to get the next index
    +        int batchIdx = currentIdx >>> 16;
    +        int recordIdx = currentIdx & 65535;
    --- End diff --
    
    Would be preferable to use a predefined constant such as HashTable.BATCH_MASK. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-drill pull request: DRILL-505 Implement hash join operat...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/incubator-drill/pull/49#discussion_r11695140
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---
    @@ -0,0 +1,519 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.join;
    +
    +import org.eigenbase.rel.JoinRelType;
    +
    +import java.io.IOException;
    +import java.util.LinkedList;
    +
    +import com.sun.codemodel.JExpression;
    +import com.sun.codemodel.JVar;
    +import com.sun.codemodel.JExpr;
    +
    +import org.apache.drill.exec.compile.sig.GeneratorMapping;
    +import org.apache.drill.exec.compile.sig.MappingSet;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
    +import org.apache.drill.exec.expr.TypeHelper;
    +import org.apache.drill.exec.expr.holders.IntHolder;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.physical.config.HashJoinPOP;
    +import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
    +import org.apache.drill.exec.physical.impl.common.HashTable;
    +import org.apache.drill.exec.physical.impl.common.HashTableConfig;
    +import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
    +import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.AbstractRecordBatch;
    +import org.apache.drill.exec.record.ExpandableHyperContainer;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.record.TypedFieldId;
    +import org.apache.drill.exec.vector.ValueVector;
    +import org.apache.drill.exec.vector.allocator.VectorAllocator;
    +
    +public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
    +    // Probe side record batch
    +    private final RecordBatch left;
    +
    +    // Build side record batch
    +    private final RecordBatch right;
    +
    +    // Join type, INNER, LEFT, RIGHT or OUTER
    +    private final JoinRelType joinType;
    +
    +    // hash table configuration, created in HashJoinPOP
    +    private HashTableConfig htConfig;
    +
    +    // Runtime generated class implementing HashJoin interface
    +    private HashJoin hashJoin = null;
    +
    +    /* Helper class
    +     * Maintains linked list of build side records with the same key
    +     * Keeps information about which build records have a corresponding
    +     * matching key in the probe side (for outer, right joins)
    +     */
    +    private HashJoinHelper hjHelper = null;
    +
    +    // Underlying hashtable used by the hash join
    +    private HashTable hashTable = null;
    +
    +    /* Hyper container to store all build side record batches.
    +     * Records are retrieved from this container when there is a matching record
    +     * on the probe side
    +     */
    +    private ExpandableHyperContainer hyperContainer;
    +
    +    // Number of records to process on the probe side
    +    private int recordsToProcess;
    +
    +    // Number of records processed on the probe side
    +    private int recordsProcessed;
    +
    +    // Number of records in the output container
    +    private int outputRecords;
    +
    +    // Current batch index on the build side
    +    private int buildBatchIndex = 0;
    +
    +    // Indicate if we should drain the next record from the probe side
    +    private boolean getNextRecord = true;
    +
    +    // Contains both batch idx and record idx of the matching record in the build side
    +    private int currentCompositeIdx = -1;
    +
    +    // Current state the hash join algorithm is in
    +    private HashJoinState hjState = HashJoinState.BUILD;
    +
    +    // For outer or right joins, this is a list of unmatched records that needs to be projected
    +    private LinkedList<Integer> unmatchedBuildIndexes = null;
    +
    +    // List of vector allocators
    +    private LinkedList<VectorAllocator> allocators = null;
    +
    +    // Schema of the build side
    +    private BatchSchema rightSchema = null;
    +
    +    // Generator mapping for the build side
    +    private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
    +                                                                                  "projectBuildRecord" /* eval method */,
    +                                                                                  null /* reset */, null /* cleanup */);
    +
    +    // Generator mapping for the probe side
    +    private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
    +                                                                                  "projectProbeRecord" /* eval method */,
    +                                                                                  null /* reset */, null /* cleanup */);
    +
    +    // Mapping set for the build side
    +    private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */,
    +                                                                  "buildBatch" /* read container */,
    +                                                                  "outgoing" /* write container */,
    +                                                                  PROJECT_BUILD, PROJECT_BUILD);
    +
    +    // Mapping set for the probe side
    +    private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
    +                                                                  "probeBatch" /* read container */,
    +                                                                  "outgoing" /* write container */,
    +                                                                  PROJECT_PROBE, PROJECT_PROBE);
    +
    +    // Possible states for the hash join
    +    public static enum HashJoinState {
    +        BUILD, // Build phase, go through the 'right' record batches and create the hash table
    +        PROBE, // Go through the 'left' record batches, probe the hash table project the records if the key matches
    +        PROJECT_RIGHT, // If its a RIGHT OUTER or FULL join go through the build records that didn't match a probe record and project them
    +        DONE // Done processing all records get out
    +    }
    +
    +    @Override
    +    public int getRecordCount() {
    +        return outputRecords;
    +    }
    +
    +
    +    @Override
    +    public IterOutcome next() {
    +
    +        IterOutcome leftUpstream = IterOutcome.NONE;
    +
    +        try {
    +
    +            if (hjState == HashJoinState.BUILD) {
    +
    +                // Initialize the hash join helper context
    +                hjHelper = new HashJoinHelper(context);
    +
    +                /* Build phase requires setting up the hash table. Hash table will
    +                 * materialize both the build and probe side expressions while
    +                 * creating the hash table. So we need to invoke next() on our probe batch
    +                 * as well, for the materialization to be successful. This batch will not be used
    +                 * till we complete the build phase.
    +                 */
    +                left.next();
    --- End diff --
    
    Does calling next() on the left (probe) side while we are in the build phase only need to be done once when we are starting the build phase ?  Once we have the probe side expression for materialization, as long as the schema does not change, we don't need to call next() again on the left side I think.  We should also think about whether there is an alternative way that maybe gives access to the expressions on the build side without the need to fetch the rows.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-drill pull request: DRILL-505 Implement hash join operat...

Posted by mehant <gi...@git.apache.org>.
Github user mehant commented on a diff in the pull request:

    https://github.com/apache/incubator-drill/pull/49#discussion_r11759876
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---
    @@ -0,0 +1,519 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.join;
    +
    +import org.eigenbase.rel.JoinRelType;
    +
    +import java.io.IOException;
    +import java.util.LinkedList;
    +
    +import com.sun.codemodel.JExpression;
    +import com.sun.codemodel.JVar;
    +import com.sun.codemodel.JExpr;
    +
    +import org.apache.drill.exec.compile.sig.GeneratorMapping;
    +import org.apache.drill.exec.compile.sig.MappingSet;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
    +import org.apache.drill.exec.expr.TypeHelper;
    +import org.apache.drill.exec.expr.holders.IntHolder;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.physical.config.HashJoinPOP;
    +import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
    +import org.apache.drill.exec.physical.impl.common.HashTable;
    +import org.apache.drill.exec.physical.impl.common.HashTableConfig;
    +import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
    +import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.AbstractRecordBatch;
    +import org.apache.drill.exec.record.ExpandableHyperContainer;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.record.TypedFieldId;
    +import org.apache.drill.exec.vector.ValueVector;
    +import org.apache.drill.exec.vector.allocator.VectorAllocator;
    +
    +public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
    +    // Probe side record batch
    +    private final RecordBatch left;
    +
    +    // Build side record batch
    +    private final RecordBatch right;
    +
    +    // Join type, INNER, LEFT, RIGHT or OUTER
    +    private final JoinRelType joinType;
    +
    +    // hash table configuration, created in HashJoinPOP
    +    private HashTableConfig htConfig;
    +
    +    // Runtime generated class implementing HashJoin interface
    +    private HashJoin hashJoin = null;
    +
    +    /* Helper class
    +     * Maintains linked list of build side records with the same key
    +     * Keeps information about which build records have a corresponding
    +     * matching key in the probe side (for outer, right joins)
    +     */
    +    private HashJoinHelper hjHelper = null;
    +
    +    // Underlying hashtable used by the hash join
    +    private HashTable hashTable = null;
    +
    +    /* Hyper container to store all build side record batches.
    +     * Records are retrieved from this container when there is a matching record
    +     * on the probe side
    +     */
    +    private ExpandableHyperContainer hyperContainer;
    +
    +    // Number of records to process on the probe side
    +    private int recordsToProcess;
    +
    +    // Number of records processed on the probe side
    +    private int recordsProcessed;
    +
    +    // Number of records in the output container
    +    private int outputRecords;
    +
    +    // Current batch index on the build side
    +    private int buildBatchIndex = 0;
    +
    +    // Indicate if we should drain the next record from the probe side
    +    private boolean getNextRecord = true;
    +
    +    // Contains both batch idx and record idx of the matching record in the build side
    +    private int currentCompositeIdx = -1;
    +
    +    // Current state the hash join algorithm is in
    +    private HashJoinState hjState = HashJoinState.BUILD;
    +
    +    // For outer or right joins, this is a list of unmatched records that needs to be projected
    +    private LinkedList<Integer> unmatchedBuildIndexes = null;
    +
    +    // List of vector allocators
    +    private LinkedList<VectorAllocator> allocators = null;
    +
    +    // Schema of the build side
    +    private BatchSchema rightSchema = null;
    +
    +    // Generator mapping for the build side
    +    private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
    +                                                                                  "projectBuildRecord" /* eval method */,
    +                                                                                  null /* reset */, null /* cleanup */);
    +
    +    // Generator mapping for the probe side
    +    private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
    +                                                                                  "projectProbeRecord" /* eval method */,
    +                                                                                  null /* reset */, null /* cleanup */);
    +
    +    // Mapping set for the build side
    +    private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */,
    +                                                                  "buildBatch" /* read container */,
    +                                                                  "outgoing" /* write container */,
    +                                                                  PROJECT_BUILD, PROJECT_BUILD);
    +
    +    // Mapping set for the probe side
    +    private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
    +                                                                  "probeBatch" /* read container */,
    +                                                                  "outgoing" /* write container */,
    +                                                                  PROJECT_PROBE, PROJECT_PROBE);
    +
    +    // Possible states for the hash join
    +    public static enum HashJoinState {
    +        BUILD, // Build phase, go through the 'right' record batches and create the hash table
    +        PROBE, // Go through the 'left' record batches, probe the hash table project the records if the key matches
    +        PROJECT_RIGHT, // If its a RIGHT OUTER or FULL join go through the build records that didn't match a probe record and project them
    +        DONE // Done processing all records get out
    +    }
    +
    +    @Override
    +    public int getRecordCount() {
    +        return outputRecords;
    +    }
    +
    +
    +    @Override
    +    public IterOutcome next() {
    +
    +        IterOutcome leftUpstream = IterOutcome.NONE;
    +
    +        try {
    +
    +            if (hjState == HashJoinState.BUILD) {
    +
    +                // Initialize the hash join helper context
    +                hjHelper = new HashJoinHelper(context);
    +
    +                /* Build phase requires setting up the hash table. Hash table will
    +                 * materialize both the build and probe side expressions while
    +                 * creating the hash table. So we need to invoke next() on our probe batch
    +                 * as well, for the materialization to be successful. This batch will not be used
    +                 * till we complete the build phase.
    +                 */
    +                left.next();
    --- End diff --
    
    Yes, we call next() on the probe side exactly once before we complete the build phase. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-drill pull request: DRILL-505 Implement hash join operat...

Posted by adityakishore <gi...@git.apache.org>.
Github user adityakishore commented on the pull request:

    https://github.com/apache/incubator-drill/pull/49#issuecomment-40960496
  
    Merged as 1fc7b982414bc0dcd29b1d31e312d2207971933a. @mehant, please close this pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-drill pull request: DRILL-505 Implement hash join operat...

Posted by mehant <gi...@git.apache.org>.
Github user mehant closed the pull request at:

    https://github.com/apache/incubator-drill/pull/49


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-drill pull request: DRILL-505 Implement hash join operat...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/incubator-drill/pull/49#discussion_r11695133
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java ---
    @@ -0,0 +1,220 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.drill.exec.physical.impl.join;
    +
    +import java.util.ArrayList;
    +import java.util.BitSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import io.netty.buffer.ByteBuf;
    +
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +import org.apache.drill.exec.physical.impl.common.HashTable;
    +
    +
    +/*
    + * Helper class for hash join. Keeps track of information about the build side batches.
    + *
    + * Hash join is a blocking operator, so we consume all the batches on the build side and
    + * store them in a hyper container. The way we can retrieve records from the hyper container
    + * is by providing the record index and batch index in the hyper container. When we invoke put()
    + * for a given row, hash table returns a global index. We store the current row's record index
    + * and batch index in this global index of the startIndices structure.
    + *
    + * Since there can be many rows with the same key on the build side, we store the first
    + * index in the startIndices list and the remaining are stored as a logical linked list using
    + * the 'links' field in the BuildInfo structures.
    + *
    + * Apart from the indexes into the hyper container, this class also stores information about
    + * which records of the build side had a matching record on the probe side. Stored in a bitvector
    + * keyMatchBitVector, it is used to retrieve all records that did not match a record on probe side
    + * for right outer and full outer joins
    + */
    +public class HashJoinHelper {
    +
    +    /* List of start indexes. Stores the record and batch index of the first record
    +     * with a give key.
    +     */
    +    public List<SelectionVector4> startIndices = new ArrayList<>();
    +
    +    // List of BuildInfo structures. Used to maintain auxiliary information about the build batches
    +    public List<BuildInfo> buildInfoList = new ArrayList<>();
    +
    +    // Fragment context
    +    public FragmentContext context;
    +
    +    // Constant to indicate index is empty.
    +    public static final int INDEX_EMPTY = -1;
    +
    +    public HashJoinHelper(FragmentContext context) {
    +        this.context = context;
    +    }
    +
    +    public void addStartIndexBatch() throws SchemaChangeException {
    +        startIndices.add(getNewSV4(HashTable.BATCH_SIZE));
    +    }
    +
    +    public class BuildInfo {
    +        // List of links. Logically it helps maintain a linked list of records with the same key value
    +        private SelectionVector4 links;
    +
    +        // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side
    +        private BitSet keyMatchBitVector;
    +
    +        // number of records in this batch
    +        private int recordCount;
    +
    +        public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) {
    +            this.links = links;
    +            this.keyMatchBitVector = keyMatchBitVector;
    +            this.recordCount = recordCount;
    +        }
    +
    +        public SelectionVector4 getLinks() {
    +            return links;
    +        }
    +
    +        public BitSet getKeyMatchBitVector() {
    +            return keyMatchBitVector;
    +        }
    +    }
    +
    +    public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
    +
    +        ByteBuf vector = context.getAllocator().buffer((recordCount * 4));
    +
    +        SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
    +
    +        // Initialize the vector
    +        for (int i = 0; i < recordCount; i++) {
    +            sv4.set(i, INDEX_EMPTY);
    +        }
    +
    +        return sv4;
    +    }
    +
    +    public void addNewBatch(int recordCount) throws SchemaChangeException {
    +        // Add a node to the list of BuildInfo's
    +        BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount);
    +        buildInfoList.add(info);
    +    }
    +
    +    public int getStartIndex(int keyIndex) {
    +        int batchIdx  = keyIndex / HashTable.BATCH_SIZE;
    +        int offsetIdx = keyIndex % HashTable.BATCH_SIZE;
    +
    +        assert batchIdx < startIndices.size();
    +
    +        SelectionVector4 sv4 = startIndices.get(batchIdx);
    +
    +        return sv4.get(offsetIdx);
    +    }
    +
    +    public int getNextIndex(int currentIdx) {
    +        // Get to the links field of the current index to get the next index
    +        int batchIdx = currentIdx >>> 16;
    +        int recordIdx = currentIdx & 65535;
    +
    +        assert batchIdx < buildInfoList.size();
    +
    +        // Get the corresponding BuildInfo node
    +        BuildInfo info = buildInfoList.get(batchIdx);
    +        return info.getLinks().get(recordIdx);
    +    }
    +
    +    public LinkedList<Integer> getNextUnmatchedIndex() {
    +        LinkedList<Integer> compositeIndexes = new LinkedList<>();
    --- End diff --
    
    In this and other places where you use LinkedList, do you really need it ?  If you are only appending to the list, ArrayList should be sufficient and better performant, less memory footprint.   


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-drill pull request: DRILL-505 Implement hash join operat...

Posted by mehant <gi...@git.apache.org>.
Github user mehant commented on the pull request:

    https://github.com/apache/incubator-drill/pull/49#issuecomment-40775688
  
    Moved the logic that invokes run time generated code into the template so it has a better chance of getting optimized. 
    
    Addressed the other minor comments as well. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-drill pull request: DRILL-505 Implement hash join operat...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on the pull request:

    https://github.com/apache/incubator-drill/pull/49#issuecomment-40535783
  
    One high-level comment regarding the code organization:  normally,  we recommend keeping the operator's internal control flow and logic in the template class (in this case HashJoinTemplate) and only put the stuff that code generator needs in the 'batch' class.  Perhaps we should discuss this ?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---