You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/03/21 22:36:59 UTC

spark git commit: [SPARK-13805] [SQL] Generate code that get a value in each column from ColumnVector when ColumnarBatch is used

Repository: spark
Updated Branches:
  refs/heads/master 9b4e15ba1 -> f35df7d18


[SPARK-13805] [SQL] Generate code that get a value in each column from ColumnVector when ColumnarBatch is used

## What changes were proposed in this pull request?

This PR generates code that get a value in each column from ```ColumnVector``` instead of creating ```InternalRow``` when ```ColumnarBatch``` is accessed. This PR improves benchmark program by up to 15%.
This PR consists of two parts:

1. Get an ```ColumnVector ``` by using ```ColumnarBatch.column()``` method
2. Get a value of each column by using ```rdd_col${COLIDX}.getInt(ROWIDX)``` instead of ```rdd_row.getInt(COLIDX)```

This is a motivated example.
````
    sqlContext.conf.setConfString(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, "true")
    sqlContext.conf.setConfString(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
    val values = 10
    withTempPath { dir =>
      withTempTable("t1", "tempTable") {
        sqlContext.range(values).registerTempTable("t1")
        sqlContext.sql("select id % 2 as p, cast(id as INT) as id from t1")
          .write.partitionBy("p").parquet(dir.getCanonicalPath)
        sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
        sqlContext.sql("select sum(p) from tempTable").collect
      }
    }
````

The original code
````java
    ...
    /* 072 */       while (!shouldStop() && rdd_batchIdx < numRows) {
    /* 073 */         InternalRow rdd_row = rdd_batch.getRow(rdd_batchIdx++);
    /* 074 */         /*** CONSUME: TungstenAggregate(key=[], functions=[(sum(cast(p#4 as bigint)),mode=Partial,isDistinct=false)], output=[sum#10L]) */
    /* 075 */         /* input[0, int] */
    /* 076 */         boolean rdd_isNull = rdd_row.isNullAt(0);
    /* 077 */         int rdd_value = rdd_isNull ? -1 : (rdd_row.getInt(0));
    ...
````

The code generated by this PR
````java
    /* 072 */       while (!shouldStop() && rdd_batchIdx < numRows) {
    /* 073 */         org.apache.spark.sql.execution.vectorized.ColumnVector rdd_col0 = rdd_batch.column(0);
    /* 074 */         /*** CONSUME: TungstenAggregate(key=[], functions=[(sum(cast(p#4 as bigint)),mode=Partial,isDistinct=false)], output=[sum#10L]) */
    /* 075 */         /* input[0, int] */
    /* 076 */         boolean rdd_isNull = rdd_col0.getIsNull(rdd_batchIdx);
    /* 077 */         int rdd_value = rdd_isNull ? -1 : (rdd_col0.getInt(rdd_batchIdx));
    ...
    /* 128 */         rdd_batchIdx++;
    /* 129 */       }
    /* 130 */       if (shouldStop()) return;

````
Performance
Without this PR
````
model name	: Intel(R) Xeon(R) CPU E5-2667 v2  3.30GHz
Partitioned Table:                  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
Read data column                          434 /  488         36.3          27.6       1.0X
Read partition column                     302 /  346         52.1          19.2       1.4X
Read both columns                         588 /  643         26.8          37.4       0.7X
````
With this PR
````
model name	: Intel(R) Xeon(R) CPU E5-2667 v2  3.30GHz
Partitioned Table:                  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
Read data column                          392 /  516         40.1          24.9       1.0X
Read partition column                     256 /  318         61.4          16.3       1.5X
Read both columns                         523 /  539         30.1          33.3       0.7X
````

## How was this patch tested?
Tested by existing test suites and benchmark

Author: Kazuaki Ishizaki <is...@jp.ibm.com>

Closes #11636 from kiszk/SPARK-13805.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f35df7d1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f35df7d1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f35df7d1

Branch: refs/heads/master
Commit: f35df7d1820738cc1dac81271041707010e2f08f
Parents: 9b4e15b
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Authored: Mon Mar 21 14:36:51 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Mon Mar 21 14:36:51 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/vectorized/ColumnVector.java  | 40 +++++++++++++------
 .../execution/vectorized/ColumnVectorUtils.java |  2 +-
 .../sql/execution/vectorized/ColumnarBatch.java |  4 +-
 .../vectorized/OffHeapColumnVector.java         |  2 +-
 .../vectorized/OnHeapColumnVector.java          |  2 +-
 .../spark/sql/execution/ExistingRDD.scala       | 42 ++++++++++++++++----
 .../parquet/ParquetEncodingSuite.scala          |  8 ++--
 .../parquet/ParquetReadBenchmark.scala          |  2 +-
 .../vectorized/ColumnarBatchSuite.scala         |  8 ++--
 9 files changed, 77 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f35df7d1/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 04adf1f..13bf4c5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -109,62 +109,62 @@ public abstract class ColumnVector {
 
       if (dt instanceof BooleanType) {
         for (int i = 0; i < length; i++) {
-          if (!data.getIsNull(offset + i)) {
+          if (!data.isNullAt(offset + i)) {
             list[i] = data.getBoolean(offset + i);
           }
         }
       } else if (dt instanceof ByteType) {
         for (int i = 0; i < length; i++) {
-          if (!data.getIsNull(offset + i)) {
+          if (!data.isNullAt(offset + i)) {
             list[i] = data.getByte(offset + i);
           }
         }
       } else if (dt instanceof ShortType) {
         for (int i = 0; i < length; i++) {
-          if (!data.getIsNull(offset + i)) {
+          if (!data.isNullAt(offset + i)) {
             list[i] = data.getShort(offset + i);
           }
         }
       } else if (dt instanceof IntegerType) {
         for (int i = 0; i < length; i++) {
-          if (!data.getIsNull(offset + i)) {
+          if (!data.isNullAt(offset + i)) {
             list[i] = data.getInt(offset + i);
           }
         }
       } else if (dt instanceof FloatType) {
         for (int i = 0; i < length; i++) {
-          if (!data.getIsNull(offset + i)) {
+          if (!data.isNullAt(offset + i)) {
             list[i] = data.getFloat(offset + i);
           }
         }
       } else if (dt instanceof DoubleType) {
         for (int i = 0; i < length; i++) {
-          if (!data.getIsNull(offset + i)) {
+          if (!data.isNullAt(offset + i)) {
             list[i] = data.getDouble(offset + i);
           }
         }
       } else if (dt instanceof LongType) {
         for (int i = 0; i < length; i++) {
-          if (!data.getIsNull(offset + i)) {
+          if (!data.isNullAt(offset + i)) {
             list[i] = data.getLong(offset + i);
           }
         }
       } else if (dt instanceof DecimalType) {
         DecimalType decType = (DecimalType)dt;
         for (int i = 0; i < length; i++) {
-          if (!data.getIsNull(offset + i)) {
+          if (!data.isNullAt(offset + i)) {
             list[i] = getDecimal(i, decType.precision(), decType.scale());
           }
         }
       } else if (dt instanceof StringType) {
         for (int i = 0; i < length; i++) {
-          if (!data.getIsNull(offset + i)) {
+          if (!data.isNullAt(offset + i)) {
             list[i] = getUTF8String(i).toString();
           }
         }
       } else if (dt instanceof CalendarIntervalType) {
         for (int i = 0; i < length; i++) {
-          if (!data.getIsNull(offset + i)) {
+          if (!data.isNullAt(offset + i)) {
             list[i] = getInterval(i);
           }
         }
@@ -175,7 +175,7 @@ public abstract class ColumnVector {
     }
 
     @Override
-    public boolean isNullAt(int ordinal) { return data.getIsNull(offset + ordinal); }
+    public boolean isNullAt(int ordinal) { return data.isNullAt(offset + ordinal); }
 
     @Override
     public boolean getBoolean(int ordinal) {
@@ -314,7 +314,7 @@ public abstract class ColumnVector {
   /**
    * Returns whether the value at rowId is NULL.
    */
-  public abstract boolean getIsNull(int rowId);
+  public abstract boolean isNullAt(int rowId);
 
   /**
    * Sets the value at rowId to `value`.
@@ -501,6 +501,15 @@ public abstract class ColumnVector {
   }
 
   /**
+   * Returns a utility object to get structs.
+   * provided to keep API compabilitity with InternalRow for code generation
+   */
+  public ColumnarBatch.Row getStruct(int rowId, int size) {
+    resultStruct.rowId = rowId;
+    return resultStruct;
+  }
+
+  /**
    * Returns the array at rowid.
    */
   public final Array getArray(int rowId) {
@@ -532,6 +541,13 @@ public abstract class ColumnVector {
   }
 
   /**
+   * Returns the value for rowId.
+   */
+  public MapData getMap(int ordinal) {
+    throw new NotImplementedException();
+  }
+
+  /**
    * Returns the decimal for rowId.
    */
   public final Decimal getDecimal(int rowId, int precision, int scale) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f35df7d1/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index b084eda..2dc57dc 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -105,7 +105,7 @@ public class ColumnVectorUtils {
       int[] result = new int[array.length];
       ColumnVector data = array.data;
       for (int i = 0; i < result.length; i++) {
-        if (data.getIsNull(array.offset + i)) {
+        if (data.isNullAt(array.offset + i)) {
           throw new RuntimeException("Cannot handle NULL values.");
         }
         result[i] = data.getInt(array.offset + i);

http://git-wip-us.apache.org/repos/asf/spark/blob/f35df7d1/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index b6fa9a0..7ab4cda 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -168,7 +168,7 @@ public final class ColumnarBatch {
     }
 
     @Override
-    public boolean isNullAt(int ordinal) { return columns[ordinal].getIsNull(rowId); }
+    public boolean isNullAt(int ordinal) { return columns[ordinal].isNullAt(rowId); }
 
     @Override
     public boolean getBoolean(int ordinal) { return columns[ordinal].getBoolean(rowId); }
@@ -295,7 +295,7 @@ public final class ColumnarBatch {
     for (int ordinal : nullFilteredColumns) {
       if (columns[ordinal].numNulls != 0) {
         for (int rowId = 0; rowId < numRows; rowId++) {
-          if (!filteredRows[rowId] && columns[ordinal].getIsNull(rowId)) {
+          if (!filteredRows[rowId] && columns[ordinal].isNullAt(rowId)) {
             filteredRows[rowId] = true;
             ++numRowsFiltered;
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/f35df7d1/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index d5a9163..689e6a2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -109,7 +109,7 @@ public final class OffHeapColumnVector extends ColumnVector {
   }
 
   @Override
-  public boolean getIsNull(int rowId) {
+  public boolean isNullAt(int rowId) {
     return Platform.getByte(null, nulls + rowId) == 1;
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f35df7d1/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 5b671a7..f332e87 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -98,7 +98,7 @@ public final class OnHeapColumnVector extends ColumnVector {
   }
 
   @Override
-  public boolean getIsNull(int rowId) {
+  public boolean isNullAt(int rowId) {
     return nulls[rowId] == 1;
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f35df7d1/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index e97c6be..b4348d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -22,9 +22,10 @@ import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.util.toCommentSafeString
 import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
@@ -195,19 +196,42 @@ private[sql] case class DataSourceScan(
     rdd :: Nil
   }
 
+  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
+    dataType: DataType, nullable: Boolean): ExprCode = {
+    val javaType = ctx.javaType(dataType)
+    val value = ctx.getValue(columnVar, dataType, ordinal)
+    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
+    val valueVar = ctx.freshName("value")
+    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
+    val code = s"/* ${toCommentSafeString(str)} */\n" + (if (nullable) {
+      s"""
+        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
+        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : ($value);
+      """
+    } else {
+      s"$javaType ${valueVar} = $value;"
+    }).trim
+    ExprCode(code, isNullVar, valueVar)
+  }
+
   // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
   // never requires UnsafeRow as input.
   override protected def doProduce(ctx: CodegenContext): String = {
     val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
+    val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
     val input = ctx.freshName("input")
     val idx = ctx.freshName("batchIdx")
+    val rowidx = ctx.freshName("rowIdx")
     val batch = ctx.freshName("batch")
     // PhysicalRDD always just has one input
     ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
     ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
     ctx.addMutableState("int", idx, s"$idx = 0;")
+    val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
+    val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
+      ctx.addMutableState(columnVectorClz, name, s"$name = null;")
+      s"$name = ${batch}.column($i);" }
 
-    val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true))
     val row = ctx.freshName("row")
     val numOutputRows = metricTerm(ctx, "numOutputRows")
 
@@ -217,19 +241,22 @@ private[sql] case class DataSourceScan(
     // TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know
     // here which path to use. Fix this.
 
-    ctx.INPUT_ROW = row
     ctx.currentVars = null
-    val columns1 = exprs.map(_.gen(ctx))
+    val columns1 = (output zip colVars).map { case (attr, colVar) =>
+      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
     val scanBatches = ctx.freshName("processBatches")
     ctx.addNewFunction(scanBatches,
       s"""
       | private void $scanBatches() throws java.io.IOException {
       |  while (true) {
       |     int numRows = $batch.numRows();
-      |     if ($idx == 0) $numOutputRows.add(numRows);
+      |     if ($idx == 0) {
+      |       ${columnAssigns.mkString("", "\n", "\n")}
+      |       $numOutputRows.add(numRows);
+      |     }
       |
       |     while (!shouldStop() && $idx < numRows) {
-      |       InternalRow $row = $batch.getRow($idx++);
+      |       int $rowidx = $idx++;
       |       ${consume(ctx, columns1).trim}
       |     }
       |     if (shouldStop()) return;
@@ -243,9 +270,10 @@ private[sql] case class DataSourceScan(
       |   }
       | }""".stripMargin)
 
+    val exprRows = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true))
     ctx.INPUT_ROW = row
     ctx.currentVars = null
-    val columns2 = exprs.map(_.gen(ctx))
+    val columns2 = exprRows.map(_.gen(ctx))
     val inputRow = if (outputUnsafeRows) row else null
     val scanRows = ctx.freshName("processRows")
     ctx.addNewFunction(scanRows,

http://git-wip-us.apache.org/repos/asf/spark/blob/f35df7d1/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index f42c754..88fcfce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -68,10 +68,10 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
         assert(batch.numRows() == n)
         var i = 0
         while (i < n) {
-          assert(batch.column(0).getIsNull(i))
-          assert(batch.column(1).getIsNull(i))
-          assert(batch.column(2).getIsNull(i))
-          assert(batch.column(3).getIsNull(i))
+          assert(batch.column(0).isNullAt(i))
+          assert(batch.column(1).isNullAt(i))
+          assert(batch.column(2).isNullAt(i))
+          assert(batch.column(3).isNullAt(i))
           i += 1
         }
         reader.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/f35df7d1/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index 070c400..cc0cc65 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -101,7 +101,7 @@ object ParquetReadBenchmark {
                 val numRows = batch.numRows()
                 var i = 0
                 while (i < numRows) {
-                  if (!col.getIsNull(i)) sum += col.getInt(i)
+                  if (!col.isNullAt(i)) sum += col.getInt(i)
                   i += 1
                 }
               }

http://git-wip-us.apache.org/repos/asf/spark/blob/f35df7d1/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index fa2c744..4262097 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -68,7 +68,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(column.numNulls() == 4)
 
       reference.zipWithIndex.foreach { v =>
-        assert(v._1 == column.getIsNull(v._2))
+        assert(v._1 == column.isNullAt(v._2))
         if (memMode == MemoryMode.OFF_HEAP) {
           val addr = column.nullsNativeAddress()
           assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" + v._2)
@@ -489,10 +489,10 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(batch.rowIterator().hasNext == true)
 
       assert(batch.column(0).getInt(0) == 1)
-      assert(batch.column(0).getIsNull(0) == false)
+      assert(batch.column(0).isNullAt(0) == false)
       assert(batch.column(1).getDouble(0) == 1.1)
-      assert(batch.column(1).getIsNull(0) == false)
-      assert(batch.column(2).getIsNull(0) == true)
+      assert(batch.column(1).isNullAt(0) == false)
+      assert(batch.column(2).isNullAt(0) == true)
       assert(batch.column(3).getUTF8String(0).toString == "Hello")
 
       // Verify the iterator works correctly.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org