You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by kiszk <gi...@git.apache.org> on 2017/05/23 05:18:49 UTC

[GitHub] spark pull request #18066: [SPARK-20822][SQL] Generate code to build table c...

GitHub user kiszk opened a pull request:

    https://github.com/apache/spark/pull/18066

    [SPARK-20822][SQL] Generate code to build table cache using ColumnarBatch and to get value from ColumnVector

    ## What changes were proposed in this pull request?
    
    This PR generates the following Java code
    1. Build each in-memory table cache using `ColumnarBatch` with `ColumnVector` instead of using CachedBatch with `Array[Byte]`.
    2. Get a value for a column in `ColumnVector without using an iterator
    
    As the first step, for ease of review, I supported only integer and double data types with whole-stage codegen. Another PR will address an execution path without whole-stage codegen
    
    This PR implements the follings:
    1. Keep a in-memory table cache using `ColumnarBatch` with `ColumnVector`. For supporting the new and coventional cache data structure, this PR declares `CachedBatch` as trait, and declares `CachedColumnarBatch` and `CachedBatchBytes` as actual implementations.
    2. Generate Java code to build a in-memory table cache.
    3. Generate Java code to directly get value from `ColumnVector`.
    
    This PR improves runtime performance by
    1. build in-memory table cache by eliminating lots of virtual calls and complicated data path.
    2. eliminating data copy from column-oriented storage to `InternalRow` in a `SpecificColumnarIterator` iterator.
    
    
    **Options**
    A ColumnVector for all primitive data types in ColumnarBatch can be compressed. Currently, there are two ways to enable compression:
    
    1. Set true into a property `spark.sql.inMemoryColumnarStorage.compressed (default is true)`, or
    2. Call `DataFrame.persist(st)`, where st is `MEMORY_ONLY_SER`, `MEMORY_ONLY_SER_2`, `MEMORY_AND_DISK_SER`, or `MEMORY_AND_DISK_SER_2`.
    
    
    **an example program**
    ```java
    val df = sparkContext.parallelize((1 to 10), 1).map(i => (i, i.toDouble)).toDF("i", "d").cache
    df.filter("i < 8 and 4.0 < d").show
    ```
    
    **Generated code for building a in-memory table cache**
    ```
    /* 001 */ import scala.collection.Iterator;
    /* 002 */ import org.apache.spark.sql.types.DataType;
    /* 003 */ import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
    /* 004 */ import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
    /* 005 */ import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
    /* 006 */ import org.apache.spark.sql.execution.vectorized.ColumnVector;
    /* 007 */
    /* 008 */ public SpecificColumnarIterator generate(Object[] references) {
    /* 009 */   return new SpecificColumnarIterator(references);
    /* 010 */ }
    /* 011 */
    /* 012 */ class SpecificColumnarIterator extends org.apache.spark.sql.execution.columnar.ColumnarIterator {
    /* 013 */   private ColumnVector[] colInstances;
    /* 014 */   private UnsafeRow unsafeRow = new UnsafeRow(0);
    /* 015 */   private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
    /* 016 */   private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 0);
    /* 017 */   private MutableUnsafeRow mutableRow = null;
    /* 018 */
    /* 019 */   private int rowIdx = 0;
    /* 020 */   private int numRowsInBatch = 0;
    /* 021 */
    /* 022 */   private scala.collection.Iterator input = null;
    /* 023 */   private DataType[] columnTypes = null;
    /* 024 */   private int[] columnIndexes = null;
    /* 025 */
    /* 026 */
    /* 027 */
    /* 028 */   public SpecificColumnarIterator(Object[] references) {
    /* 029 */
    /* 030 */     this.mutableRow = new MutableUnsafeRow(rowWriter);
    /* 031 */   }
    /* 032 */
    /* 033 */   public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
    /* 034 */     this.input = input;
    /* 035 */     this.columnTypes = columnTypes;
    /* 036 */     this.columnIndexes = columnIndexes;
    /* 037 */   }
    /* 038 */
    /* 039 */
    /* 040 */
    /* 041 */   public boolean hasNext() {
    /* 042 */     if (rowIdx < numRowsInBatch) {
    /* 043 */       return true;
    /* 044 */     }
    /* 045 */     if (!input.hasNext()) {
    /* 046 */       return false;
    /* 047 */     }
    /* 048 */
    /* 049 */     org.apache.spark.sql.execution.columnar.CachedColumnarBatch cachedBatch =
    /* 050 */     (org.apache.spark.sql.execution.columnar.CachedColumnarBatch) input.next();
    /* 051 */     org.apache.spark.sql.execution.vectorized.ColumnarBatch batch = cachedBatch.columnarBatch();
    /* 052 */     rowIdx = 0;
    /* 053 */     numRowsInBatch = cachedBatch.getNumRows();
    /* 054 */     colInstances = new ColumnVector[columnIndexes.length];
    /* 055 */     for (int i = 0; i < columnIndexes.length; i ++) {
    /* 056 */       colInstances[i] = batch.column(columnIndexes[i]);
    /* 057 */     }
    /* 058 */
    /* 059 */     return hasNext();
    /* 060 */   }
    /* 061 */
    /* 062 */   public InternalRow next() {
    /* 063 */     bufferHolder.reset();
    /* 064 */     rowWriter.zeroOutNullBytes();
    /* 065 */
    /* 066 */     unsafeRow.setTotalSize(bufferHolder.totalSize());
    /* 067 */     rowIdx += 1;
    /* 068 */     return unsafeRow;
    /* 069 */   }
    /* 070 */ }
    ```
    
    **Generated code by whole-stage codegen (lines 75-78 are major changes)**
    ```
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIterator(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private scala.collection.Iterator[] inputs;
    /* 008 */   private boolean agg_initAgg;
    /* 009 */   private boolean agg_bufIsNull;
    /* 010 */   private long agg_bufValue;
    /* 011 */   private scala.collection.Iterator inmemorytablescan_input;
    /* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric inmemorytablescan_numOutputRows;
    /* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric inmemorytablescan_scanTime;
    /* 014 */   private long inmemorytablescan_scanTime1;
    /* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch inmemorytablescan_batch;
    /* 016 */   private int inmemorytablescan_batchIdx;
    /* 017 */   private org.apache.spark.sql.execution.vectorized.ColumnVector inmemorytablescan_colInstance0;
    /* 018 */   private org.apache.spark.sql.execution.vectorized.ColumnVector inmemorytablescan_colInstance1;
    /* 019 */   private UnsafeRow inmemorytablescan_result;
    /* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder inmemorytablescan_holder;
    /* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter inmemorytablescan_rowWriter;
    /* 022 */   private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows;
    /* 023 */   private UnsafeRow filter_result;
    /* 024 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
    /* 025 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter;
    /* 026 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
    /* 027 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
    /* 028 */   private UnsafeRow agg_result;
    /* 029 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
    /* 030 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
    /* 031 */
    /* 032 */   public GeneratedIterator(Object[] references) {
    /* 033 */     this.references = references;
    /* 034 */   }
    /* 035 */
    /* 036 */   public void init(int index, scala.collection.Iterator[] inputs) {
    /* 037 */     partitionIndex = index;
    /* 038 */     this.inputs = inputs;
    /* 039 */     wholestagecodegen_init_0();
    /* 040 */     wholestagecodegen_init_1();
    /* 041 */
    /* 042 */   }
    /* 043 */
    /* 044 */   private void wholestagecodegen_init_0() {
    /* 045 */     agg_initAgg = false;
    /* 046 */
    /* 047 */     inmemorytablescan_input = inputs[0];
    /* 048 */     this.inmemorytablescan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
    /* 049 */     this.inmemorytablescan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
    /* 050 */     inmemorytablescan_scanTime1 = 0;
    /* 051 */     inmemorytablescan_batch = null;
    /* 052 */     inmemorytablescan_batchIdx = 0;
    /* 053 */     inmemorytablescan_colInstance0 = null;
    /* 054 */     inmemorytablescan_colInstance1 = null;
    /* 055 */     inmemorytablescan_result = new UnsafeRow(2);
    /* 056 */     this.inmemorytablescan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(inmemorytablescan_result, 0);
    /* 057 */     this.inmemorytablescan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(inmemorytablescan_holder, 2);
    /* 058 */     this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
    /* 059 */     filter_result = new UnsafeRow(2);
    /* 060 */     this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 0);
    /* 061 */     this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 2);
    /* 062 */
    /* 063 */   }
    /* 064 */
    /* 065 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
    /* 066 */     // initialize aggregation buffer
    /* 067 */     agg_bufIsNull = false;
    /* 068 */     agg_bufValue = 0L;
    /* 069 */
    /* 070 */     if (inmemorytablescan_batch == null) {
    /* 071 */       inmemorytablescan_nextBatch();
    /* 072 */     }
    /* 073 */     while (inmemorytablescan_batch != null) {
    /* 074 */       int inmemorytablescan_numRows = inmemorytablescan_batch.numRows();
    /* 075 */       int inmemorytablescan_localEnd = inmemorytablescan_numRows - inmemorytablescan_batchIdx;
    /* 076 */       for (int inmemorytablescan_localIdx = 0; inmemorytablescan_localIdx < inmemorytablescan_localEnd; inmemorytablescan_localIdx++) {
    /* 077 */         int inmemorytablescan_rowIdx = inmemorytablescan_batchIdx + inmemorytablescan_localIdx;
    /* 078 */         int inmemorytablescan_value = inmemorytablescan_colInstance0.getInt(inmemorytablescan_rowIdx);
    /* 079 */
    /* 080 */         boolean filter_isNull = false;
    /* 081 */
    /* 082 */         boolean filter_value = false;
    /* 083 */         filter_value = inmemorytablescan_value > 4;
    /* 084 */         if (!filter_value) continue;
    /* 085 */         double inmemorytablescan_value1 = inmemorytablescan_colInstance1.getDouble(inmemorytablescan_rowIdx);
    /* 086 */
    /* 087 */         boolean filter_isNull3 = false;
    /* 088 */
    /* 089 */         boolean filter_value3 = false;
    /* 090 */         filter_value3 = org.apache.spark.util.Utils.nanSafeCompareDoubles(inmemorytablescan_value1, 10.0D) > 0;
    /* 091 */         if (!filter_value3) continue;
    /* 092 */
    /* 093 */         filter_numOutputRows.add(1);
    /* 094 */
    /* 095 */         // do aggregate
    /* 096 */         // common sub-expressions
    /* 097 */
    /* 098 */         // evaluate aggregate function
    /* 099 */         boolean agg_isNull1 = false;
    /* 100 */
    /* 101 */         long agg_value1 = -1L;
    /* 102 */         agg_value1 = agg_bufValue + 1L;
    /* 103 */         // update aggregation buffer
    /* 104 */         agg_bufIsNull = false;
    /* 105 */         agg_bufValue = agg_value1;
    /* 106 */         // shouldStop check is eliminated
    /* 107 */       }
    /* 108 */       inmemorytablescan_batchIdx = inmemorytablescan_numRows;
    /* 109 */       inmemorytablescan_batch = null;
    /* 110 */       inmemorytablescan_nextBatch();
    /* 111 */     }
    /* 112 */     inmemorytablescan_scanTime.add(inmemorytablescan_scanTime1 / (1000 * 1000));
    /* 113 */     inmemorytablescan_scanTime1 = 0;
    /* 114 */
    /* 115 */   }
    /* 116 */
    /* 117 */   private void inmemorytablescan_nextBatch() throws java.io.IOException {
    /* 118 */     long getBatchStart = System.nanoTime();
    /* 119 */     if (inmemorytablescan_input.hasNext()) {
    /* 120 */       inmemorytablescan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)inmemorytablescan_input.next();
    /* 121 */       inmemorytablescan_numOutputRows.add(inmemorytablescan_batch.numRows());
    /* 122 */       inmemorytablescan_batchIdx = 0;
    /* 123 */       inmemorytablescan_colInstance0 = inmemorytablescan_batch.column(0);
    /* 124 */       inmemorytablescan_colInstance1 = inmemorytablescan_batch.column(1);
    /* 125 */
    /* 126 */     }
    /* 127 */     inmemorytablescan_scanTime1 += System.nanoTime() - getBatchStart;
    /* 128 */   }
    /* 129 */
    /* 130 */   private void wholestagecodegen_init_1() {
    /* 131 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
    /* 132 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[4];
    /* 133 */     agg_result = new UnsafeRow(1);
    /* 134 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
    /* 135 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
    /* 136 */
    /* 137 */   }
    /* 138 */
    /* 139 */   protected void processNext() throws java.io.IOException {
    /* 140 */     while (!agg_initAgg) {
    /* 141 */       agg_initAgg = true;
    /* 142 */       long agg_beforeAgg = System.nanoTime();
    /* 143 */       agg_doAggregateWithoutKey();
    /* 144 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
    /* 145 */
    /* 146 */       // output the result
    /* 147 */
    /* 148 */       agg_numOutputRows.add(1);
    /* 149 */       agg_rowWriter.zeroOutNullBytes();
    /* 150 */
    /* 151 */       if (agg_bufIsNull) {
    /* 152 */         agg_rowWriter.setNullAt(0);
    /* 153 */       } else {
    /* 154 */         agg_rowWriter.write(0, agg_bufValue);
    /* 155 */       }
    /* 156 */       append(agg_result);
    /* 157 */     }
    /* 158 */   }
    /* 159 */ }
    ```
    
    ## How was this patch tested?
    
    Add test suites for wider columns

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kiszk/spark SPARK-20822

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18066.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18066
    
----
commit 6ed3d3fa51cd9b09e2f137bda87dcb16e5a9fb1a
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Date:   2017-05-23T04:54:25Z

    initial commit

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    #18747 is another PR for this JIRA entry


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    **[Test build #77222 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77222/testReport)** for PR 18066 at commit [`6ed3d3f`](https://github.com/apache/spark/commit/6ed3d3fa51cd9b09e2f137bda87dcb16e5a9fb1a).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class GenerateColumnAccessor(useColumnarBatch: Boolean)`
      * `class GenerateColumnarBatch(
`
      * `      class GeneratedColumnarBatchIterator extends $`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    @hvanhovell @sameeragarwal Would you please review this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77222/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77702/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77235/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    ping @hvanhovell @sameeragarwal


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18066: [SPARK-20822][SQL] Generate code to build table c...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18066#discussion_r120038491
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GeneratedColumnarBatch.scala ---
    @@ -0,0 +1,220 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.columnar
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.execution.vectorized.ColumnarBatch
    +import org.apache.spark.sql.types._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.storage.StorageLevel._
    +
    +
    +/**
    + * A helper class to expose the scala iterator to Java.
    + */
    +abstract class ColumnarBatchIterator extends Iterator[ColumnarBatch]
    +
    +
    +/**
    + * Generate code to batch [[InternalRow]]s into [[ColumnarBatch]]es.
    + */
    +class GenerateColumnarBatch(
    +    schema: StructType,
    +    batchSize: Int,
    +    storageLevel: StorageLevel)
    +  extends CodeGenerator[Iterator[InternalRow], Iterator[CachedColumnarBatch]] {
    +
    +  protected def canonicalize(in: Iterator[InternalRow]): Iterator[InternalRow] = in
    +
    +  protected def bind(
    +    in: Iterator[InternalRow], inputSchema: Seq[Attribute]): Iterator[InternalRow] = {
    +    in
    +  }
    +
    +  protected def create(rowIterator: Iterator[InternalRow]): Iterator[CachedColumnarBatch] = {
    +    import scala.collection.JavaConverters._
    +    val ctx = newCodeGenContext()
    +    val columnStatsCls = classOf[ColumnStats].getName
    +    val rowVar = ctx.freshName("row")
    +    val batchVar = ctx.freshName("columnarBatch")
    +    val rowNumVar = ctx.freshName("rowNum")
    +    val numBytesVar = ctx.freshName("bytesInBatch")
    +    ctx.addMutableState("long", numBytesVar, s"$numBytesVar = 0;")
    +    val rowIterVar = ctx.addReferenceObj(
    +      "rowIterator", rowIterator.asJava, classOf[java.util.Iterator[_]].getName)
    +    val schemas = StructType(
    +      schema.fields.map(s => StructField(s.name,
    +        s.dataType match {
    +          case udt: UserDefinedType[_] => udt.sqlType
    +          case other => other
    +        }, s.nullable))
    +    )
    +    val schemaVar = ctx.addReferenceObj("schema", schemas, classOf[StructType].getName)
    +    val maxNumBytes = ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE
    +    val numColumns = schema.fields.length
    +
    +    val colStatVars = (0 to numColumns - 1).map(i => ctx.freshName("colStat" + i))
    +    val colStatCode = ctx.splitExpressions(
    +      (schemas.fields zip colStatVars).zipWithIndex.map {
    +        case ((field, varName), i) =>
    +          val columnStatsCls = field.dataType match {
    +            case IntegerType => classOf[IntColumnStats].getName
    +            case DoubleType => classOf[DoubleColumnStats].getName
    +            case others => throw new UnsupportedOperationException(s"$others is not supported yet")
    +          }
    +          ctx.addMutableState(columnStatsCls, varName, "")
    +          s"$varName = new $columnStatsCls(); statsArray[$i] = $varName;\n"
    +      },
    +      "apply",
    +      Seq.empty
    +    )
    +
    +    val populateColumnVectorsCode = ctx.splitExpressions(
    +      (schemas.fields zip colStatVars).zipWithIndex.map {
    +        case ((field, colStatVar), i) =>
    +          GenerateColumnarBatch.putColumnCode(ctx, field.dataType, field.nullable,
    +            batchVar, rowVar, rowNumVar, colStatVar, i, numBytesVar).trim + "\n"
    +      },
    +      "apply",
    +      Seq(("InternalRow", rowVar), ("ColumnarBatch", batchVar), ("int", rowNumVar))
    +    )
    +
    +    val code = s"""
    +      import org.apache.spark.memory.MemoryMode;
    +      import org.apache.spark.sql.catalyst.InternalRow;
    +      import org.apache.spark.sql.execution.columnar.CachedColumnarBatch;
    +      import org.apache.spark.sql.execution.columnar.GenerateColumnarBatch;
    +      import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    +      import org.apache.spark.sql.execution.vectorized.ColumnVector;
    +
    +      public GeneratedColumnarBatchIterator generate(Object[] references) {
    +        return new GeneratedColumnarBatchIterator(references);
    +      }
    +
    +      class GeneratedColumnarBatchIterator extends ${classOf[ColumnarBatchIterator].getName} {
    +        private Object[] references;
    +        ${ctx.declareMutableStates()}
    +
    +        public GeneratedColumnarBatchIterator(Object[] references) {
    +          this.references = references;
    +          ${ctx.initMutableStates()}
    +        }
    +
    +        ${ctx.declareAddedFunctions()}
    +
    +        $columnStatsCls[] statsArray = new $columnStatsCls[$numColumns];
    +        private void allocateColumnStats() {
    +          ${colStatCode.trim}
    +        }
    +
    +        @Override
    +        public boolean hasNext() {
    +          return $rowIterVar.hasNext();
    +        }
    +
    +        @Override
    +        public CachedColumnarBatch next() {
    +          ColumnarBatch $batchVar =
    +          ColumnarBatch.allocate($schemaVar, MemoryMode.ON_HEAP, $batchSize);
    +          allocateColumnStats();
    +          int $rowNumVar = 0;
    +          $numBytesVar = 0;
    +          while ($rowIterVar.hasNext() && $rowNumVar < $batchSize && $numBytesVar < $maxNumBytes) {
    +            InternalRow $rowVar = (InternalRow) $rowIterVar.next();
    +            $populateColumnVectorsCode
    +            $rowNumVar += 1;
    +          }
    +          $batchVar.setNumRows($rowNumVar);
    +          return CachedColumnarBatch.apply(
    +            $batchVar, GenerateColumnarBatch.generateStats(statsArray));
    +        }
    +      }
    +      """
    +    val formattedCode = CodeFormatter.stripOverlappingComments(
    +      new CodeAndComment(code, ctx.getPlaceHolderToComments()))
    +    CodeGenerator.compile(formattedCode).generate(ctx.references.toArray)
    +      .asInstanceOf[Iterator[CachedColumnarBatch]]
    +  }
    +}
    +
    +
    +private[sql] object GenerateColumnarBatch {
    +  def compressStorageLevel(storageLevel: StorageLevel, useCompression: Boolean): StorageLevel = {
    +    if (!useCompression) return storageLevel
    +    storageLevel match {
    +      case MEMORY_ONLY => MEMORY_ONLY_SER
    +      case MEMORY_ONLY_2 => MEMORY_ONLY_SER_2
    +      case MEMORY_AND_DISK => MEMORY_AND_DISK_SER
    +      case MEMORY_AND_DISK_2 => MEMORY_AND_DISK_SER_2
    +      case sl => sl
    +    }
    +  }
    +
    +  def isCompress(storageLevel: StorageLevel) : Boolean = {
    +    (storageLevel == MEMORY_ONLY_SER || storageLevel == MEMORY_ONLY_SER_2 ||
    +      storageLevel == MEMORY_AND_DISK_SER || storageLevel == MEMORY_AND_DISK_SER_2)
    +  }
    +
    +  private val typeToName = Map[AbstractDataType, String](
    --- End diff --
    
    Hi, @kiszk .
    Is there any reason having only two types, `int` and `double`?
    The PR looks more general to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    **[Test build #77702 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77702/testReport)** for PR 18066 at commit [`c183032`](https://github.com/apache/spark/commit/c183032a0edc3837bd0e697a15b298b47a2ab5ab).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18066: [SPARK-20822][SQL] Generate code to build table c...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk closed the pull request at:

    https://github.com/apache/spark/pull/18066


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    **[Test build #77222 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77222/testReport)** for PR 18066 at commit [`6ed3d3f`](https://github.com/apache/spark/commit/6ed3d3fa51cd9b09e2f137bda87dcb16e5a9fb1a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    ping @hvanhovell


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    ping @hvanhovell @sameeragarwal


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    **[Test build #77702 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77702/testReport)** for PR 18066 at commit [`c183032`](https://github.com/apache/spark/commit/c183032a0edc3837bd0e697a15b298b47a2ab5ab).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    **[Test build #77235 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77235/testReport)** for PR 18066 at commit [`513acfb`](https://github.com/apache/spark/commit/513acfbf746e0299ceb1174e0ed76fe3dadad8a3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18066: [SPARK-20822][SQL] Generate code to build table cache us...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18066
  
    **[Test build #77235 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77235/testReport)** for PR 18066 at commit [`513acfb`](https://github.com/apache/spark/commit/513acfbf746e0299ceb1174e0ed76fe3dadad8a3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18066: [SPARK-20822][SQL] Generate code to build table c...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18066#discussion_r120042739
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GeneratedColumnarBatch.scala ---
    @@ -0,0 +1,220 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.columnar
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.execution.vectorized.ColumnarBatch
    +import org.apache.spark.sql.types._
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.storage.StorageLevel._
    +
    +
    +/**
    + * A helper class to expose the scala iterator to Java.
    + */
    +abstract class ColumnarBatchIterator extends Iterator[ColumnarBatch]
    +
    +
    +/**
    + * Generate code to batch [[InternalRow]]s into [[ColumnarBatch]]es.
    + */
    +class GenerateColumnarBatch(
    +    schema: StructType,
    +    batchSize: Int,
    +    storageLevel: StorageLevel)
    +  extends CodeGenerator[Iterator[InternalRow], Iterator[CachedColumnarBatch]] {
    +
    +  protected def canonicalize(in: Iterator[InternalRow]): Iterator[InternalRow] = in
    +
    +  protected def bind(
    +    in: Iterator[InternalRow], inputSchema: Seq[Attribute]): Iterator[InternalRow] = {
    +    in
    +  }
    +
    +  protected def create(rowIterator: Iterator[InternalRow]): Iterator[CachedColumnarBatch] = {
    +    import scala.collection.JavaConverters._
    +    val ctx = newCodeGenContext()
    +    val columnStatsCls = classOf[ColumnStats].getName
    +    val rowVar = ctx.freshName("row")
    +    val batchVar = ctx.freshName("columnarBatch")
    +    val rowNumVar = ctx.freshName("rowNum")
    +    val numBytesVar = ctx.freshName("bytesInBatch")
    +    ctx.addMutableState("long", numBytesVar, s"$numBytesVar = 0;")
    +    val rowIterVar = ctx.addReferenceObj(
    +      "rowIterator", rowIterator.asJava, classOf[java.util.Iterator[_]].getName)
    +    val schemas = StructType(
    +      schema.fields.map(s => StructField(s.name,
    +        s.dataType match {
    +          case udt: UserDefinedType[_] => udt.sqlType
    +          case other => other
    +        }, s.nullable))
    +    )
    +    val schemaVar = ctx.addReferenceObj("schema", schemas, classOf[StructType].getName)
    +    val maxNumBytes = ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE
    +    val numColumns = schema.fields.length
    +
    +    val colStatVars = (0 to numColumns - 1).map(i => ctx.freshName("colStat" + i))
    +    val colStatCode = ctx.splitExpressions(
    +      (schemas.fields zip colStatVars).zipWithIndex.map {
    +        case ((field, varName), i) =>
    +          val columnStatsCls = field.dataType match {
    +            case IntegerType => classOf[IntColumnStats].getName
    +            case DoubleType => classOf[DoubleColumnStats].getName
    +            case others => throw new UnsupportedOperationException(s"$others is not supported yet")
    +          }
    +          ctx.addMutableState(columnStatsCls, varName, "")
    +          s"$varName = new $columnStatsCls(); statsArray[$i] = $varName;\n"
    +      },
    +      "apply",
    +      Seq.empty
    +    )
    +
    +    val populateColumnVectorsCode = ctx.splitExpressions(
    +      (schemas.fields zip colStatVars).zipWithIndex.map {
    +        case ((field, colStatVar), i) =>
    +          GenerateColumnarBatch.putColumnCode(ctx, field.dataType, field.nullable,
    +            batchVar, rowVar, rowNumVar, colStatVar, i, numBytesVar).trim + "\n"
    +      },
    +      "apply",
    +      Seq(("InternalRow", rowVar), ("ColumnarBatch", batchVar), ("int", rowNumVar))
    +    )
    +
    +    val code = s"""
    +      import org.apache.spark.memory.MemoryMode;
    +      import org.apache.spark.sql.catalyst.InternalRow;
    +      import org.apache.spark.sql.execution.columnar.CachedColumnarBatch;
    +      import org.apache.spark.sql.execution.columnar.GenerateColumnarBatch;
    +      import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    +      import org.apache.spark.sql.execution.vectorized.ColumnVector;
    +
    +      public GeneratedColumnarBatchIterator generate(Object[] references) {
    +        return new GeneratedColumnarBatchIterator(references);
    +      }
    +
    +      class GeneratedColumnarBatchIterator extends ${classOf[ColumnarBatchIterator].getName} {
    +        private Object[] references;
    +        ${ctx.declareMutableStates()}
    +
    +        public GeneratedColumnarBatchIterator(Object[] references) {
    +          this.references = references;
    +          ${ctx.initMutableStates()}
    +        }
    +
    +        ${ctx.declareAddedFunctions()}
    +
    +        $columnStatsCls[] statsArray = new $columnStatsCls[$numColumns];
    +        private void allocateColumnStats() {
    +          ${colStatCode.trim}
    +        }
    +
    +        @Override
    +        public boolean hasNext() {
    +          return $rowIterVar.hasNext();
    +        }
    +
    +        @Override
    +        public CachedColumnarBatch next() {
    +          ColumnarBatch $batchVar =
    +          ColumnarBatch.allocate($schemaVar, MemoryMode.ON_HEAP, $batchSize);
    +          allocateColumnStats();
    +          int $rowNumVar = 0;
    +          $numBytesVar = 0;
    +          while ($rowIterVar.hasNext() && $rowNumVar < $batchSize && $numBytesVar < $maxNumBytes) {
    +            InternalRow $rowVar = (InternalRow) $rowIterVar.next();
    +            $populateColumnVectorsCode
    +            $rowNumVar += 1;
    +          }
    +          $batchVar.setNumRows($rowNumVar);
    +          return CachedColumnarBatch.apply(
    +            $batchVar, GenerateColumnarBatch.generateStats(statsArray));
    +        }
    +      }
    +      """
    +    val formattedCode = CodeFormatter.stripOverlappingComments(
    +      new CodeAndComment(code, ctx.getPlaceHolderToComments()))
    +    CodeGenerator.compile(formattedCode).generate(ctx.references.toArray)
    +      .asInstanceOf[Iterator[CachedColumnarBatch]]
    +  }
    +}
    +
    +
    +private[sql] object GenerateColumnarBatch {
    +  def compressStorageLevel(storageLevel: StorageLevel, useCompression: Boolean): StorageLevel = {
    +    if (!useCompression) return storageLevel
    +    storageLevel match {
    +      case MEMORY_ONLY => MEMORY_ONLY_SER
    +      case MEMORY_ONLY_2 => MEMORY_ONLY_SER_2
    +      case MEMORY_AND_DISK => MEMORY_AND_DISK_SER
    +      case MEMORY_AND_DISK_2 => MEMORY_AND_DISK_SER_2
    +      case sl => sl
    +    }
    +  }
    +
    +  def isCompress(storageLevel: StorageLevel) : Boolean = {
    +    (storageLevel == MEMORY_ONLY_SER || storageLevel == MEMORY_ONLY_SER_2 ||
    +      storageLevel == MEMORY_AND_DISK_SER || storageLevel == MEMORY_AND_DISK_SER_2)
    +  }
    +
    +  private val typeToName = Map[AbstractDataType, String](
    --- End diff --
    
    As I described in the description, this is for ease of review.
    
    >As the first step, for ease of review, I supported only integer and double data types with whole-stage codegen. Another PR will address an execution path without whole-stage codegen



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18066: [SPARK-20822][SQL] Generate code to build table c...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18066#discussion_r120038532
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala ---
    @@ -64,6 +88,32 @@ case class InMemoryRelation(
         val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
       extends logical.LeafNode with MultiInstanceRelation {
     
    +  /**
    +   * If true, store the input rows using [[CachedColumnarBatch]]es, which are generally faster.
    +   * If false, store the input rows using [[CachedBatchBytes]].
    +   */
    +  private def numOfNestedFields(dataType: DataType): Int = dataType match {
    +    case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum
    +    case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType)
    +    case a: ArrayType => numOfNestedFields(a.elementType)
    +    case u: UserDefinedType[_] => numOfNestedFields(u.sqlType)
    +    case _ => 1
    +  }
    +
    +  private[columnar] val useColumnarBatches: Boolean = {
    +    // In the initial implementation, for ease of review
    +    // support only integer and double and # of fields is less than wholeStageMaxNumFields
    --- End diff --
    
    Oh, I see. Here is the comment about the reason.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org