You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2016/02/04 00:17:23 UTC

[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/11065

    [SPARK-13095] [SQL] improve performance for broadcast join with dimension table

    This PR improve the performance for Broadcast join with dimension tables, which is common in data warehouse.
    
    If the join key can fit in a long, we will use a special api `get(Long)` to get the rows from HashedRelation.
    
    If the HashedRelation only have unique keys, we will use a special api `getValue(Long)` or `getValue(InternalRow)`.
    
    If the keys can fit within a long, also the keys are dense, we will use a array of UnsafeRow, instead a hash map.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark gen_dim

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11065.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11065
    
----
commit c0ca36a4b2b3a638e3cbcd59edf0e75609b97777
Author: Davies Liu <da...@databricks.com>
Date:   2016-02-03T18:45:00Z

    improve performance for broadcast join with dimension table

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179594090
  
    **[Test build #50717 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50717/consoleFull)** for PR 11065 at commit [`cdb5509`](https://github.com/apache/spark/commit/cdb5509408191709a02a68a5d5fe74446837e309).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179533022
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50699/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-181590398
  
    Merging this into master, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180126340
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50777/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179594167
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-181589916
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179546199
  
    **[Test build #50711 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50711/consoleFull)** for PR 11065 at commit [`e630af8`](https://github.com/apache/spark/commit/e630af8f828a42b2c6428e748ab261fae494cfa1).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179748519
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179549650
  
    **[Test build #50712 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50712/consoleFull)** for PR 11065 at commit [`a2b0aaf`](https://github.com/apache/spark/commit/a2b0aaf809c727f0cb9c92c0f35584207e85f3c2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180137767
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179564816
  
    **[Test build #50717 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50717/consoleFull)** for PR 11065 at commit [`cdb5509`](https://github.com/apache/spark/commit/cdb5509408191709a02a68a5d5fe74446837e309).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179747960
  
    **[Test build #50742 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50742/consoleFull)** for PR 11065 at commit [`a680f25`](https://github.com/apache/spark/commit/a680f25141f3e1b4f8d91fc8d03569388b0a1cd3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180075423
  
    For join with two ints 
    ```
        val dim2 = broadcast(sqlContext.range(1 << 16)
          .selectExpr("cast(id as int) as k1", "cast(id as int) as k2", "cast(id as string) as v"))
    
          sqlContext.range(N).join(dim2,
            (col("id") bitwiseAND 60000).cast(IntegerType) === col("k1")
              && (col("id") bitwiseAND 50000).cast(IntegerType) === col("k2")).count()
    
    ```
    
    ```
    /* 001 */
    /* 002 */ public Object generate(Object[] references) {
    /* 003 */   return new GeneratedIterator(references);
    /* 004 */ }
    /* 005 */
    /* 006 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 007 */
    /* 008 */   private Object[] references;
    /* 009 */   private boolean agg_initAgg;
    /* 010 */   private boolean agg_bufIsNull;
    /* 011 */   private long agg_bufValue;
    /* 012 */   private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast;
    /* 013 */   private org.apache.spark.sql.execution.joins.UniqueLongHashedRelation bhj_relation;
    /* 014 */   private boolean range_initRange;
    /* 015 */   private long range_partitionEnd;
    /* 016 */   private long range_number;
    /* 017 */   private boolean range_overflow;
    /* 018 */   private UnsafeRow agg_result;
    /* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
    /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
    /* 021 */
    /* 022 */   public GeneratedIterator(Object[] references) {
    /* 023 */     this.references = references;
    /* 024 */     agg_initAgg = false;
    /* 025 */
    /* 026 */
    /* 027 */     this.bhj_broadcast = (org.apache.spark.broadcast.TorrentBroadcast) references[0];
    /* 028 */
    /* 029 */     bhj_relation = (org.apache.spark.sql.execution.joins.UniqueLongHashedRelation) bhj_broadcast.value();
    /* 030 */     incPeakExecutionMemory(bhj_relation.getMemorySize());
    /* 031 */
    /* 032 */     range_initRange = false;
    /* 033 */     range_partitionEnd = 0L;
    /* 034 */     range_number = 0L;
    /* 035 */     range_overflow = false;
    /* 036 */     agg_result = new UnsafeRow(1);
    /* 037 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
    /* 038 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
    /* 039 */   }
    /* 040 */
    /* 041 */
    /* 042 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
    /* 043 */     // initialize aggregation buffer
    /* 044 */
    /* 045 */     agg_bufIsNull = false;
    /* 046 */     agg_bufValue = 0L;
    /* 047 */
    /* 048 */
    /* 049 */
    /* 050 */
    /* 051 */     // initialize Range
    /* 052 */     if (!range_initRange) {
    /* 053 */       range_initRange = true;
    /* 054 */       if (input.hasNext()) {
    /* 055 */         initRange(((InternalRow) input.next()).getInt(0));
    /* 056 */       } else {
    /* 057 */         return;
    /* 058 */       }
    /* 059 */     }
    /* 060 */
    /* 061 */     while (!range_overflow && range_number < range_partitionEnd) {
    /* 062 */       long range_value = range_number;
    /* 063 */       range_number += 1L;
    /* 064 */       if (range_number < range_value ^ 1L < 0) {
    /* 065 */         range_overflow = true;
    /* 066 */       }
    /* 067 */
    /* 068 */       // generate join key
    /* 069 */       /* (shiftleft(cast(cast((input[0, bigint] & 60000) as int) as bigint),32) | (cast(cast((input[0, bigint] & 50000) as int) as bigint) & 0)) */
    /* 070 */       /* shiftleft(cast(cast((input[0, bigint] & 60000) as int) as bigint),32) */
    /* 071 */       /* cast(cast((input[0, bigint] & 60000) as int) as bigint) */
    /* 072 */       /* cast((input[0, bigint] & 60000) as int) */
    /* 073 */       /* (input[0, bigint] & 60000) */
    /* 074 */       long bhj_value4 = -1L;
    /* 075 */       bhj_value4 = range_value & 60000L;
    /* 076 */       boolean bhj_isNull3 = false;
    /* 077 */       int bhj_value3 = -1;
    /* 078 */       if (!false) {
    /* 079 */         bhj_value3 = (int) bhj_value4;
    /* 080 */       }
    /* 081 */       boolean bhj_isNull2 = bhj_isNull3;
    /* 082 */       long bhj_value2 = -1L;
    /* 083 */       if (!bhj_isNull3) {
    /* 084 */         bhj_value2 = (long) bhj_value3;
    /* 085 */       }
    /* 086 */
    /* 087 */       long bhj_value1 = -1L;
    /* 088 */       bhj_value1 = bhj_value2 << 32;
    /* 089 */       /* (cast(cast((input[0, bigint] & 50000) as int) as bigint) & 0) */
    /* 090 */       /* cast(cast((input[0, bigint] & 50000) as int) as bigint) */
    /* 091 */       /* cast((input[0, bigint] & 50000) as int) */
    /* 092 */       /* (input[0, bigint] & 50000) */
    /* 093 */       long bhj_value11 = -1L;
    /* 094 */       bhj_value11 = range_value & 50000L;
    /* 095 */       boolean bhj_isNull10 = false;
    /* 096 */       int bhj_value10 = -1;
    /* 097 */       if (!false) {
    /* 098 */         bhj_value10 = (int) bhj_value11;
    /* 099 */       }
    /* 100 */       boolean bhj_isNull9 = bhj_isNull10;
    /* 101 */       long bhj_value9 = -1L;
    /* 102 */       if (!bhj_isNull10) {
    /* 103 */         bhj_value9 = (long) bhj_value10;
    /* 104 */       }
    /* 105 */
    /* 106 */       long bhj_value8 = -1L;
    /* 107 */       bhj_value8 = bhj_value9 & 0;
    /* 108 */       long bhj_value = -1L;
    /* 109 */       bhj_value = bhj_value1 | bhj_value8;
    /* 110 */       // find matches from HashedRelation
    /* 111 */       UnsafeRow bhj_matched = false ? null: (UnsafeRow)bhj_relation.getValue(bhj_value);
    /* 112 */       if (bhj_matched != null) {
    /* 113 */         /* input[0, int] */
    /* 114 */         int bhj_value15 = bhj_matched.getInt(0);
    /* 115 */         /* input[1, int] */
    /* 116 */         int bhj_value16 = bhj_matched.getInt(1);
    /* 117 */
    /* 118 */
    /* 119 */
    /* 120 */
    /* 121 */         // do aggregate
    /* 122 */         /* (input[0, bigint] + 1) */
    /* 123 */         long agg_value1 = -1L;
    /* 124 */         agg_value1 = agg_bufValue + 1L;
    /* 125 */         // update aggregation buffer
    /* 126 */         agg_bufIsNull = false;
    /* 127 */         agg_bufValue = agg_value1;
    /* 128 */
    /* 129 */
    /* 130 */       }
    /* 131 */
    /* 132 */
    /* 133 */       if (shouldStop()) return;
    /* 134 */     }
    /* 135 */
    /* 136 */
    /* 137 */   }
    /* 138 */
    /* 139 */
    /* 140 */   private void initRange(int idx) {
    /* 141 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
    /* 142 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L);
    /* 143 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(104857600L);
    /* 144 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
    /* 145 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
    /* 146 */
    /* 147 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
    /* 148 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
    /* 149 */       range_number = Long.MAX_VALUE;
    /* 150 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
    /* 151 */       range_number = Long.MIN_VALUE;
    /* 152 */     } else {
    /* 153 */       range_number = st.longValue();
    /* 154 */     }
    /* 155 */
    /* 156 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
    /* 157 */     .multiply(step).add(start);
    /* 158 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
    /* 159 */       range_partitionEnd = Long.MAX_VALUE;
    /* 160 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
    /* 161 */       range_partitionEnd = Long.MIN_VALUE;
    /* 162 */     } else {
    /* 163 */       range_partitionEnd = end.longValue();
    /* 164 */     }
    /* 165 */   }
    /* 166 */
    /* 167 */
    /* 168 */   protected void processNext() throws java.io.IOException {
    /* 169 */     if (!agg_initAgg) {
    /* 170 */       agg_initAgg = true;
    /* 171 */       agg_doAggregateWithoutKey();
    /* 172 */
    /* 173 */       // output the result
    /* 174 */
    /* 175 */
    /* 176 */       agg_rowWriter.zeroOutNullBytes();
    /* 177 */
    /* 178 */
    /* 179 */       if (agg_bufIsNull) {
    /* 180 */         agg_rowWriter.setNullAt(0);
    /* 181 */       } else {
    /* 182 */         agg_rowWriter.write(0, agg_bufValue);
    /* 183 */       }
    /* 184 */       currentRows.add(agg_result.copy());
    /* 185 */     }
    /* 186 */   }
    /* 187 */ }
    /* 188 */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179550415
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50712/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180137771
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50786/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179698198
  
    **[Test build #50742 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50742/consoleFull)** for PR 11065 at commit [`a680f25`](https://github.com/apache/spark/commit/a680f25141f3e1b4f8d91fc8d03569388b0a1cd3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180136959
  
    **[Test build #50786 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50786/consoleFull)** for PR 11065 at commit [`50181da`](https://github.com/apache/spark/commit/50181dab88e86d883be61e69e47b0a459f2fcfbe).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180083649
  
    **[Test build #50777 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50777/consoleFull)** for PR 11065 at commit [`319e993`](https://github.com/apache/spark/commit/319e99315ed3799069e135be7aad5d114f77f970).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11065#discussion_r51954518
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala ---
    @@ -408,6 +453,210 @@ private[joins] object UnsafeHashedRelation {
           }
         }
     
    +    // TODO: create UniqueUnsafeRelation
         new UnsafeHashedRelation(hashTable)
       }
     }
    +
    +/**
    +  * An interface for a hashed relation that the key is a Long.
    +  */
    +private[joins] trait LongHashedRelation extends HashedRelation {
    +  override def get(key: InternalRow): Seq[InternalRow] = {
    +    get(key.getLong(0))
    +  }
    +}
    +
    +private[joins] final class GeneralLongHashedRelation(
    +  private var hashTable: JavaHashMap[Long, CompactBuffer[UnsafeRow]])
    +  extends LongHashedRelation with Externalizable {
    +
    +  // Needed for serialization (it is public to make Java serialization work)
    +  def this() = this(null)
    +
    +  override def get(key: Long): Seq[InternalRow] = hashTable.get(key)
    +
    +  override def writeExternal(out: ObjectOutput): Unit = {
    +    writeBytes(out, SparkSqlSerializer.serialize(hashTable))
    +  }
    +
    +  override def readExternal(in: ObjectInput): Unit = {
    +    hashTable = SparkSqlSerializer.deserialize(readBytes(in))
    +  }
    +}
    +
    +private[joins] final class UniqueLongHashedRelation(
    +  private var hashTable: JavaHashMap[Long, UnsafeRow])
    +  extends UniqueHashedRelation with LongHashedRelation with Externalizable {
    +
    +  // Needed for serialization (it is public to make Java serialization work)
    +  def this() = this(null)
    +
    +  override def getValue(key: InternalRow): InternalRow = {
    +    getValue(key.getLong(0))
    +  }
    +
    +  override def getValue(key: Long): InternalRow = {
    +    hashTable.get(key)
    +  }
    +
    +  override def writeExternal(out: ObjectOutput): Unit = {
    +    writeBytes(out, SparkSqlSerializer.serialize(hashTable))
    +  }
    +
    +  override def readExternal(in: ObjectInput): Unit = {
    +    hashTable = SparkSqlSerializer.deserialize(readBytes(in))
    +  }
    +}
    +
    +/**
    +  * A relation that pack all the rows into a byte array, together with offsets and sizes.
    --- End diff --
    
    This should describe a bit more about it. The input arguments are not obvious. It should say it is used for dense keys and how it stores them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180109155
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50784/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179533018
  
    **[Test build #50699 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50699/consoleFull)** for PR 11065 at commit [`c0ca36a`](https://github.com/apache/spark/commit/c0ca36a4b2b3a638e3cbcd59edf0e75609b97777).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180005247
  
    Can you include the most relevant generated code when the join key is a single long and when the join is key is two ints?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11065#discussion_r51954890
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala ---
    @@ -408,6 +453,210 @@ private[joins] object UnsafeHashedRelation {
           }
         }
     
    +    // TODO: create UniqueUnsafeRelation
         new UnsafeHashedRelation(hashTable)
       }
     }
    +
    +/**
    +  * An interface for a hashed relation that the key is a Long.
    +  */
    +private[joins] trait LongHashedRelation extends HashedRelation {
    +  override def get(key: InternalRow): Seq[InternalRow] = {
    +    get(key.getLong(0))
    +  }
    +}
    +
    +private[joins] final class GeneralLongHashedRelation(
    +  private var hashTable: JavaHashMap[Long, CompactBuffer[UnsafeRow]])
    +  extends LongHashedRelation with Externalizable {
    +
    +  // Needed for serialization (it is public to make Java serialization work)
    +  def this() = this(null)
    +
    +  override def get(key: Long): Seq[InternalRow] = hashTable.get(key)
    +
    +  override def writeExternal(out: ObjectOutput): Unit = {
    +    writeBytes(out, SparkSqlSerializer.serialize(hashTable))
    +  }
    +
    +  override def readExternal(in: ObjectInput): Unit = {
    +    hashTable = SparkSqlSerializer.deserialize(readBytes(in))
    +  }
    +}
    +
    +private[joins] final class UniqueLongHashedRelation(
    +  private var hashTable: JavaHashMap[Long, UnsafeRow])
    +  extends UniqueHashedRelation with LongHashedRelation with Externalizable {
    +
    +  // Needed for serialization (it is public to make Java serialization work)
    +  def this() = this(null)
    +
    +  override def getValue(key: InternalRow): InternalRow = {
    +    getValue(key.getLong(0))
    +  }
    +
    +  override def getValue(key: Long): InternalRow = {
    +    hashTable.get(key)
    +  }
    +
    +  override def writeExternal(out: ObjectOutput): Unit = {
    +    writeBytes(out, SparkSqlSerializer.serialize(hashTable))
    +  }
    +
    +  override def readExternal(in: ObjectInput): Unit = {
    +    hashTable = SparkSqlSerializer.deserialize(readBytes(in))
    +  }
    +}
    +
    +/**
    +  * A relation that pack all the rows into a byte array, together with offsets and sizes.
    +  */
    +private[joins] final class LongArrayRelation(
    +    private var numFields: Int,
    +    private var start: Long,
    +    private var offsets: Array[Int],
    +    private var sizes: Array[Int],
    +    private var bytes: Array[Byte]
    +  ) extends UniqueHashedRelation with LongHashedRelation with Externalizable {
    +
    +  // Needed for serialization (it is public to make Java serialization work)
    +  def this() = this(0, 0L, null, null, null)
    +
    +  override def getValue(key: InternalRow): InternalRow = {
    +    getValue(key.getLong(0))
    +  }
    +
    +  override def getMemorySize: Long = {
    +    offsets.length * 4 + sizes.length * 4 + bytes.length
    +  }
    +
    +  override def getValue(key: Long): InternalRow = {
    +    val idx = (key - start).toInt
    +    if (idx >= 0 && idx < sizes.length && sizes(idx) > 0) {
    +      val result = new UnsafeRow(numFields)
    +      result.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET + offsets(idx), sizes(idx))
    +      result
    +    } else {
    +      null
    +    }
    +  }
    +
    +  override def writeExternal(out: ObjectOutput): Unit = {
    +    out.writeInt(numFields)
    +    out.writeLong(start)
    +    out.writeInt(sizes.length)
    +    var i = 0
    +    while (i < sizes.length) {
    +      out.writeInt(sizes(i))
    +      i += 1
    +    }
    +    out.writeInt(bytes.length)
    +    out.write(bytes)
    +  }
    +
    +  override def readExternal(in: ObjectInput): Unit = {
    +    numFields = in.readInt()
    +    start = in.readLong()
    +    val length = in.readInt()
    +    // read sizes of rows
    +    sizes = new Array[Int](length)
    +    offsets = new Array[Int](length)
    +    var i = 0
    +    var offset = 0
    +    while (i < length) {
    +      offsets(i) = offset
    +      sizes(i) = in.readInt()
    +      offset += sizes(i)
    +      i += 1
    +    }
    +    // read all the bytes
    +    val total = in.readInt()
    +    assert(total == offset)
    +    bytes = new Array[Byte](total)
    +    in.readFully(bytes)
    +  }
    +}
    +
    +/**
    +  * Create hashed relation with key that is long.
    +  */
    +private[joins] object LongHashedRelation {
    +  def apply(
    +    input: Iterator[InternalRow],
    +    numInputRows: LongSQLMetric,
    +    keyGenerator: Projection,
    +    sizeEstimate: Int): HashedRelation = {
    +
    +    // Use a Java hash table here because unsafe maps expect fixed size records
    +    val hashTable = new JavaHashMap[Long, CompactBuffer[UnsafeRow]](sizeEstimate)
    +
    +    // Create a mapping of buildKeys -> rows
    +    var numFields = 0
    +    var keyIsUnique = true
    +    var minKey = Long.MaxValue
    +    var maxKey = Long.MinValue
    +    while (input.hasNext) {
    +      val unsafeRow = input.next().asInstanceOf[UnsafeRow]
    +      numFields = unsafeRow.numFields()
    +      numInputRows += 1
    +      val rowKey = keyGenerator(unsafeRow)
    +      if (!rowKey.anyNull) {
    +        val key = rowKey.getLong(0)
    +        minKey = math.min(minKey, key)
    +        maxKey = math.max(maxKey, key)
    +        val existingMatchList = hashTable.get(key)
    +        val matchList = if (existingMatchList == null) {
    +          val newMatchList = new CompactBuffer[UnsafeRow]()
    +          hashTable.put(key, newMatchList)
    +          newMatchList
    +        } else {
    +          keyIsUnique = false
    +          existingMatchList
    +        }
    +        matchList += unsafeRow.copy()
    +      }
    +    }
    +
    +    if (keyIsUnique) {
    +      if (maxKey - minKey <= hashTable.size() * 5) {
    --- End diff --
    
    how did you pick this? Can we make this a constant named variable at the top of this class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179546207
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50711/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180076144
  
    @nongli Posted the generated code, also add a benchmark for join with 2 int keys


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179692688
  
    **[Test build #2513 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2513/consoleFull)** for PR 11065 at commit [`f3d008d`](https://github.com/apache/spark/commit/f3d008dbad341fb7accff1a37ec2423cb10fd19c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179692897
  
    **[Test build #2513 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2513/consoleFull)** for PR 11065 at commit [`f3d008d`](https://github.com/apache/spark/commit/f3d008dbad341fb7accff1a37ec2423cb10fd19c).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179550406
  
    **[Test build #50712 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50712/consoleFull)** for PR 11065 at commit [`a2b0aaf`](https://github.com/apache/spark/commit/a2b0aaf809c727f0cb9c92c0f35584207e85f3c2).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179690496
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179594170
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50717/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180096071
  
    @davies in the two int case, line 107
    
    /* 107 */       bhj_value8 = bhj_value9 & 0;
    Is that right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180108062
  
    @nongli Good catch, fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180109895
  
    **[Test build #50786 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50786/consoleFull)** for PR 11065 at commit [`50181da`](https://github.com/apache/spark/commit/50181dab88e86d883be61e69e47b0a459f2fcfbe).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by nongli <gi...@git.apache.org>.
Github user nongli commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11065#discussion_r51954333
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala ---
    @@ -408,6 +453,210 @@ private[joins] object UnsafeHashedRelation {
           }
         }
     
    +    // TODO: create UniqueUnsafeRelation
         new UnsafeHashedRelation(hashTable)
       }
     }
    +
    +/**
    +  * An interface for a hashed relation that the key is a Long.
    +  */
    +private[joins] trait LongHashedRelation extends HashedRelation {
    +  override def get(key: InternalRow): Seq[InternalRow] = {
    +    get(key.getLong(0))
    +  }
    +}
    +
    +private[joins] final class GeneralLongHashedRelation(
    +  private var hashTable: JavaHashMap[Long, CompactBuffer[UnsafeRow]])
    +  extends LongHashedRelation with Externalizable {
    +
    +  // Needed for serialization (it is public to make Java serialization work)
    +  def this() = this(null)
    +
    +  override def get(key: Long): Seq[InternalRow] = hashTable.get(key)
    +
    +  override def writeExternal(out: ObjectOutput): Unit = {
    +    writeBytes(out, SparkSqlSerializer.serialize(hashTable))
    +  }
    +
    +  override def readExternal(in: ObjectInput): Unit = {
    +    hashTable = SparkSqlSerializer.deserialize(readBytes(in))
    +  }
    +}
    +
    +private[joins] final class UniqueLongHashedRelation(
    +  private var hashTable: JavaHashMap[Long, UnsafeRow])
    +  extends UniqueHashedRelation with LongHashedRelation with Externalizable {
    +
    +  // Needed for serialization (it is public to make Java serialization work)
    +  def this() = this(null)
    +
    +  override def getValue(key: InternalRow): InternalRow = {
    +    getValue(key.getLong(0))
    +  }
    +
    +  override def getValue(key: Long): InternalRow = {
    +    hashTable.get(key)
    +  }
    +
    +  override def writeExternal(out: ObjectOutput): Unit = {
    +    writeBytes(out, SparkSqlSerializer.serialize(hashTable))
    +  }
    +
    +  override def readExternal(in: ObjectInput): Unit = {
    +    hashTable = SparkSqlSerializer.deserialize(readBytes(in))
    +  }
    +}
    +
    +/**
    +  * A relation that pack all the rows into a byte array, together with offsets and sizes.
    +  */
    +private[joins] final class LongArrayRelation(
    +    private var numFields: Int,
    +    private var start: Long,
    +    private var offsets: Array[Int],
    +    private var sizes: Array[Int],
    +    private var bytes: Array[Byte]
    +  ) extends UniqueHashedRelation with LongHashedRelation with Externalizable {
    +
    +  // Needed for serialization (it is public to make Java serialization work)
    +  def this() = this(0, 0L, null, null, null)
    +
    +  override def getValue(key: InternalRow): InternalRow = {
    +    getValue(key.getLong(0))
    +  }
    +
    +  override def getMemorySize: Long = {
    +    offsets.length * 4 + sizes.length * 4 + bytes.length
    +  }
    +
    +  override def getValue(key: Long): InternalRow = {
    +    val idx = (key - start).toInt
    +    if (idx >= 0 && idx < sizes.length && sizes(idx) > 0) {
    +      val result = new UnsafeRow(numFields)
    +      result.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET + offsets(idx), sizes(idx))
    +      result
    +    } else {
    +      null
    +    }
    +  }
    +
    +  override def writeExternal(out: ObjectOutput): Unit = {
    +    out.writeInt(numFields)
    +    out.writeLong(start)
    +    out.writeInt(sizes.length)
    +    var i = 0
    +    while (i < sizes.length) {
    +      out.writeInt(sizes(i))
    +      i += 1
    +    }
    +    out.writeInt(bytes.length)
    +    out.write(bytes)
    +  }
    +
    +  override def readExternal(in: ObjectInput): Unit = {
    +    numFields = in.readInt()
    +    start = in.readLong()
    +    val length = in.readInt()
    +    // read sizes of rows
    +    sizes = new Array[Int](length)
    +    offsets = new Array[Int](length)
    +    var i = 0
    +    var offset = 0
    +    while (i < length) {
    +      offsets(i) = offset
    +      sizes(i) = in.readInt()
    +      offset += sizes(i)
    +      i += 1
    +    }
    +    // read all the bytes
    +    val total = in.readInt()
    +    assert(total == offset)
    +    bytes = new Array[Byte](total)
    +    in.readFully(bytes)
    +  }
    +}
    +
    +/**
    +  * Create hashed relation with key that is long.
    --- End diff --
    
    Document this better. Explain at a high level the strategy this is using. How it looks for unique dense keys, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179690500
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50740/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179550412
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180109151
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179748524
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/50742/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/11065


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179533020
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180126057
  
    **[Test build #50777 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50777/consoleFull)** for PR 11065 at commit [`319e993`](https://github.com/apache/spark/commit/319e99315ed3799069e135be7aad5d114f77f970).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179546206
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180065661
  
    For join with single Long key:
    ```
        val dim = broadcast(sqlContext.range(1 << 16).selectExpr("id as k", "cast(id as string) as v"))
        sqlContext.range(N).join(dim, (col("id") % 60000) === col("k")).count()
    ```
    Will generate:
    ```
    /* 001 */
    /* 002 */ public Object generate(Object[] references) {
    /* 003 */   return new GeneratedIterator(references);
    /* 004 */ }
    /* 005 */
    /* 006 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 007 */
    /* 008 */   private Object[] references;
    /* 009 */   private boolean agg_initAgg;
    /* 010 */   private boolean agg_bufIsNull;
    /* 011 */   private long agg_bufValue;
    /* 012 */   private org.apache.spark.broadcast.TorrentBroadcast bhj_broadcast;
    /* 013 */   private org.apache.spark.sql.execution.joins.LongArrayRelation bhj_relation;
    /* 014 */   private boolean range_initRange;
    /* 015 */   private long range_partitionEnd;
    /* 016 */   private long range_number;
    /* 017 */   private boolean range_overflow;
    /* 018 */   private UnsafeRow agg_result;
    /* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
    /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
    /* 021 */
    /* 022 */   public GeneratedIterator(Object[] references) {
    /* 023 */     this.references = references;
    /* 024 */     agg_initAgg = false;
    /* 025 */
    /* 026 */
    /* 027 */     this.bhj_broadcast = (org.apache.spark.broadcast.TorrentBroadcast) references[0];
    /* 028 */
    /* 029 */     bhj_relation = (org.apache.spark.sql.execution.joins.LongArrayRelation) bhj_broadcast.value();
    /* 030 */     incPeakExecutionMemory(bhj_relation.getMemorySize());
    /* 031 */
    /* 032 */     range_initRange = false;
    /* 033 */     range_partitionEnd = 0L;
    /* 034 */     range_number = 0L;
    /* 035 */     range_overflow = false;
    /* 036 */     agg_result = new UnsafeRow(1);
    /* 037 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
    /* 038 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
    /* 039 */   }
    /* 040 */
    /* 041 */
    /* 042 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
    /* 043 */     // initialize aggregation buffer
    /* 045 */     agg_bufIsNull = false;
    /* 046 */     agg_bufValue = 0L;
    /* 051 */     // initialize Range
    /* 052 */     if (!range_initRange) {
    /* 053 */       range_initRange = true;
    /* 054 */       if (input.hasNext()) {
    /* 055 */         initRange(((InternalRow) input.next()).getInt(0));
    /* 056 */       } else {
    /* 057 */         return;
    /* 058 */       }
    /* 059 */     }
    /* 060 */
    /* 061 */     while (!range_overflow && range_number < range_partitionEnd) {
    /* 062 */       long range_value = range_number;
    /* 063 */       range_number += 1L;
    /* 064 */       if (range_number < range_value ^ 1L < 0) {
    /* 065 */         range_overflow = true;
    /* 066 */       }
    /* 067 */
    /* 068 */       // generate join key
    /* 069 */       /* cast((input[0, bigint] % 60000) as bigint) */
    /* 070 */       /* (input[0, bigint] % 60000) */
    /* 071 */       boolean bhj_isNull1 = false;
    /* 072 */       long bhj_value1 = -1L;
    /* 073 */       if (false || 60000L == 0) {
    /* 074 */         bhj_isNull1 = true;
    /* 075 */       } else {
    /* 076 */
    /* 077 */         if (false) {
    /* 078 */           bhj_isNull1 = true;
    /* 079 */         } else {
    /* 080 */           bhj_value1 = (long)(range_value % 60000L);
    /* 081 */         }
    /* 082 */       }
    /* 083 */       boolean bhj_isNull = bhj_isNull1;
    /* 084 */       long bhj_value = -1L;
    /* 085 */       if (!bhj_isNull1) {
    /* 086 */         bhj_value = bhj_value1;
    /* 087 */       }
    /* 088 */       // find matches from HashedRelation
    /* 089 */       UnsafeRow bhj_matched = bhj_isNull ? null:(UnsafeRow)bhj_relation.getValue(bhj_value);
    /* 090 */       if (bhj_matched != null) {
    /* 091 */         /* input[0, bigint] */
    /* 092 */         long bhj_value4 = bhj_matched.getLong(0);
    /* 097 */         // do aggregate
    /* 098 */         /* (input[0, bigint] + 1) */
    /* 099 */         long agg_value1 = -1L;
    /* 100 */         agg_value1 = agg_bufValue + 1L;
    /* 101 */         // update aggregation buffer
    /* 102 */         agg_bufIsNull = false;
    /* 103 */         agg_bufValue = agg_value1;
    /* 106 */       }
    /* 109 */       if (shouldStop()) return;
    /* 110 */     }
    /* 113 */   }
    /* 116 */   private void initRange(int idx) {
    /* 141 */   }
    /* 142 */
    /* 143 */
    /* 144 */   protected void processNext() throws java.io.IOException {
    /* 145 */     if (!agg_initAgg) {
    /* 146 */       agg_initAgg = true;
    /* 147 */       agg_doAggregateWithoutKey();
    /* 148 */
    /* 149 */       // output the result
    /* 150 */
    /* 151 */
    /* 152 */       agg_rowWriter.zeroOutNullBytes();
    /* 153 */
    /* 154 */
    /* 155 */       if (agg_bufIsNull) {
    /* 156 */         agg_rowWriter.setNullAt(0);
    /* 157 */       } else {
    /* 158 */         agg_rowWriter.write(0, agg_bufValue);
    /* 159 */       }
    /* 160 */       currentRows.add(agg_result.copy());
    /* 161 */     }
    /* 162 */   }
    /* 163 */ }
    /* 164 */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-179532458
  
    **[Test build #50699 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/50699/consoleFull)** for PR 11065 at commit [`c0ca36a`](https://github.com/apache/spark/commit/c0ca36a4b2b3a638e3cbcd59edf0e75609b97777).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-13095] [SQL] improve performance for br...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/11065#issuecomment-180126336
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org