You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/10/07 21:03:53 UTC
[3/3] spark git commit: [SPARK-17761][SQL] Remove MutableRow
[SPARK-17761][SQL] Remove MutableRow
## What changes were proposed in this pull request?
In practice we cannot guarantee that an `InternalRow` is immutable. This makes the `MutableRow` almost redundant. This PR folds `MutableRow` into `InternalRow`.
The code below illustrates the immutability issue with InternalRow:
```scala
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
val struct = new GenericMutableRow(1)
val row = InternalRow(struct, 1)
println(row)
scala> [[null], 1]
struct.setInt(0, 42)
println(row)
scala> [[42], 1]
```
This might be somewhat controversial, so feedback is appreciated.
## How was this patch tested?
Existing tests.
Author: Herman van Hovell <hv...@databricks.com>
Closes #15333 from hvanhovell/SPARK-17761.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97594c29
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97594c29
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97594c29
Branch: refs/heads/master
Commit: 97594c29b723f372a5c4c061760015bd78d01f50
Parents: 2badb58
Author: Herman van Hovell <hv...@databricks.com>
Authored: Fri Oct 7 14:03:45 2016 -0700
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Fri Oct 7 14:03:45 2016 -0700
----------------------------------------------------------------------
.../org/apache/spark/ml/linalg/MatrixUDT.scala | 4 +-
.../org/apache/spark/ml/linalg/VectorUDT.scala | 6 +-
.../apache/spark/mllib/linalg/Matrices.scala | 4 +-
.../org/apache/spark/mllib/linalg/Vectors.scala | 6 +-
.../sql/catalyst/expressions/UnsafeRow.java | 2 +-
.../apache/spark/sql/catalyst/InternalRow.scala | 23 +-
.../catalyst/encoders/ExpressionEncoder.scala | 2 +-
.../spark/sql/catalyst/expressions/Cast.scala | 4 +-
.../sql/catalyst/expressions/JoinedRow.scala | 16 +
.../sql/catalyst/expressions/Projection.scala | 4 +-
.../expressions/SpecificInternalRow.scala | 313 ++++++++++++++++++
.../expressions/SpecificMutableRow.scala | 314 -------------------
.../aggregate/HyperLogLogPlusPlus.scala | 6 +-
.../expressions/aggregate/PivotFirst.scala | 10 +-
.../expressions/aggregate/collect.scala | 6 +-
.../expressions/aggregate/interfaces.scala | 14 +-
.../expressions/codegen/CodeGenerator.scala | 3 +-
.../codegen/GenerateMutableProjection.scala | 8 +-
.../codegen/GenerateSafeProjection.scala | 8 +-
.../sql/catalyst/expressions/package.scala | 2 +-
.../spark/sql/catalyst/expressions/rows.scala | 44 +--
.../spark/sql/catalyst/json/JacksonParser.scala | 4 +-
.../sql/catalyst/ScalaReflectionSuite.scala | 4 +-
.../expressions/CodeGenerationSuite.scala | 16 +-
.../sql/catalyst/expressions/MapDataSuite.scala | 2 +-
.../expressions/UnsafeRowConverterSuite.scala | 26 +-
.../aggregate/ApproximatePercentileSuite.scala | 9 +-
.../aggregate/HyperLogLogPlusPlusSuite.scala | 13 +-
.../sql/execution/vectorized/ColumnarBatch.java | 7 +-
.../spark/sql/execution/ExistingRDD.scala | 4 +-
.../aggregate/AggregationIterator.scala | 26 +-
.../SortBasedAggregationIterator.scala | 6 +-
.../aggregate/TungstenAggregationIterator.scala | 8 +-
.../spark/sql/execution/aggregate/udaf.scala | 38 +--
.../sql/execution/columnar/ColumnAccessor.scala | 13 +-
.../sql/execution/columnar/ColumnType.scala | 72 ++---
.../columnar/GenerateColumnAccessor.scala | 6 +-
.../columnar/NullableColumnAccessor.scala | 4 +-
.../CompressibleColumnAccessor.scala | 4 +-
.../compression/CompressionScheme.scala | 3 +-
.../compression/compressionSchemes.scala | 20 +-
.../datasources/DataSourceStrategy.scala | 2 +-
.../execution/datasources/csv/CSVRelation.scala | 4 +-
.../execution/datasources/jdbc/JdbcUtils.scala | 34 +-
.../parquet/ParquetRowConverter.scala | 6 +-
.../joins/BroadcastNestedLoopJoinExec.scala | 10 +-
.../spark/sql/execution/joins/HashJoin.scala | 2 +-
.../sql/execution/joins/SortMergeJoinExec.scala | 2 +-
.../apache/spark/sql/execution/objects.scala | 4 +-
.../execution/python/BatchEvalPythonExec.scala | 2 +-
.../sql/execution/stat/StatFunctions.scala | 4 +-
.../execution/window/AggregateProcessor.scala | 4 +-
.../spark/sql/execution/window/WindowExec.scala | 12 +-
.../execution/window/WindowFunctionFrame.scala | 10 +-
.../scala/org/apache/spark/sql/RowSuite.scala | 6 +-
.../sql/TypedImperativeAggregateSuite.scala | 6 +-
.../execution/columnar/ColumnTypeSuite.scala | 4 +-
.../execution/columnar/ColumnarTestUtils.scala | 12 +-
.../columnar/NullableColumnAccessorSuite.scala | 4 +-
.../columnar/NullableColumnBuilderSuite.scala | 4 +-
.../compression/BooleanBitSetSuite.scala | 4 +-
.../CompressionSchemeBenchmark.scala | 4 +-
.../compression/DictionaryEncodingSuite.scala | 4 +-
.../compression/IntegralDeltaSuite.scala | 6 +-
.../compression/RunLengthEncodingSuite.scala | 4 +-
.../datasources/parquet/ParquetIOSuite.scala | 4 +-
.../datasources/parquet/ParquetQuerySuite.scala | 4 +-
.../apache/spark/sql/hive/HiveInspectors.scala | 18 +-
.../org/apache/spark/sql/hive/TableReader.scala | 38 +--
.../hive/execution/ScriptTransformation.scala | 2 +-
.../org/apache/spark/sql/hive/hiveUDFs.scala | 6 +-
.../spark/sql/hive/orc/OrcFileFormat.scala | 2 +-
72 files changed, 654 insertions(+), 658 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
index a1e5366..f4a8556 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
@@ -18,7 +18,7 @@
package org.apache.spark.ml.linalg
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
import org.apache.spark.sql.types._
/**
@@ -46,7 +46,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
}
override def serialize(obj: Matrix): InternalRow = {
- val row = new GenericMutableRow(7)
+ val row = new GenericInternalRow(7)
obj match {
case sm: SparseMatrix =>
row.setByte(0, 0)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
index 0b9b2ff..9178613 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
@@ -18,7 +18,7 @@
package org.apache.spark.ml.linalg
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
import org.apache.spark.sql.types._
/**
@@ -42,14 +42,14 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
override def serialize(obj: Vector): InternalRow = {
obj match {
case SparseVector(size, indices, values) =>
- val row = new GenericMutableRow(4)
+ val row = new GenericInternalRow(4)
row.setByte(0, 0)
row.setInt(1, size)
row.update(2, UnsafeArrayData.fromPrimitiveArray(indices))
row.update(3, UnsafeArrayData.fromPrimitiveArray(values))
row
case DenseVector(values) =>
- val row = new GenericMutableRow(4)
+ val row = new GenericInternalRow(4)
row.setByte(0, 1)
row.setNullAt(1)
row.setNullAt(2)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
index 6642999..542a69b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
@@ -28,7 +28,7 @@ import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.spark.annotation.Since
import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
import org.apache.spark.sql.types._
/**
@@ -189,7 +189,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
}
override def serialize(obj: Matrix): InternalRow = {
- val row = new GenericMutableRow(7)
+ val row = new GenericInternalRow(7)
obj match {
case sm: SparseMatrix =>
row.setByte(0, 0)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 91f0658..fbd217a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -34,7 +34,7 @@ import org.apache.spark.annotation.{AlphaComponent, Since}
import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.mllib.util.NumericParser
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
import org.apache.spark.sql.types._
/**
@@ -214,14 +214,14 @@ class VectorUDT extends UserDefinedType[Vector] {
override def serialize(obj: Vector): InternalRow = {
obj match {
case SparseVector(size, indices, values) =>
- val row = new GenericMutableRow(4)
+ val row = new GenericInternalRow(4)
row.setByte(0, 0)
row.setInt(1, size)
row.update(2, UnsafeArrayData.fromPrimitiveArray(indices))
row.update(3, UnsafeArrayData.fromPrimitiveArray(values))
row
case DenseVector(values) =>
- val row = new GenericMutableRow(4)
+ val row = new GenericInternalRow(4)
row.setByte(0, 1)
row.setNullAt(1)
row.setNullAt(2)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 9027652..c3f0aba 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -59,7 +59,7 @@ import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
*
* Instances of `UnsafeRow` act as pointers to row data stored in this format.
*/
-public final class UnsafeRow extends MutableRow implements Externalizable, KryoSerializable {
+public final class UnsafeRow extends InternalRow implements Externalizable, KryoSerializable {
//////////////////////////////////////////////////////////////////////////////
// Static methods
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
index eba95c5..f498e07 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{DataType, Decimal, StructType}
/**
* An abstract class for row used internal in Spark SQL, which only contain the columns as
@@ -31,6 +31,27 @@ abstract class InternalRow extends SpecializedGetters with Serializable {
// This is only use for test and will throw a null pointer exception if the position is null.
def getString(ordinal: Int): String = getUTF8String(ordinal).toString
+ def setNullAt(i: Int): Unit
+
+ def update(i: Int, value: Any): Unit
+
+ // default implementation (slow)
+ def setBoolean(i: Int, value: Boolean): Unit = update(i, value)
+ def setByte(i: Int, value: Byte): Unit = update(i, value)
+ def setShort(i: Int, value: Short): Unit = update(i, value)
+ def setInt(i: Int, value: Int): Unit = update(i, value)
+ def setLong(i: Int, value: Long): Unit = update(i, value)
+ def setFloat(i: Int, value: Float): Unit = update(i, value)
+ def setDouble(i: Int, value: Double): Unit = update(i, value)
+
+ /**
+ * Update the decimal column at `i`.
+ *
+ * Note: In order to support update decimal with precision > 18 in UnsafeRow,
+ * CAN NOT call setNullAt() for decimal column on UnsafeRow, call setDecimal(i, null, precision).
+ */
+ def setDecimal(i: Int, value: Decimal, precision: Int) { update(i, value) }
+
/**
* Make a copy of the current [[InternalRow]] object.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index b96b744..82e1a8a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -256,7 +256,7 @@ case class ExpressionEncoder[T](
private lazy val extractProjection = GenerateUnsafeProjection.generate(serializer)
@transient
- private lazy val inputRow = new GenericMutableRow(1)
+ private lazy val inputRow = new GenericInternalRow(1)
@transient
private lazy val constructProjection = GenerateSafeProjection.generate(deserializer :: Nil)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 70fff51..1314c41 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -403,7 +403,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case (fromField, toField) => cast(fromField.dataType, toField.dataType)
}
// TODO: Could be faster?
- val newRow = new GenericMutableRow(from.fields.length)
+ val newRow = new GenericInternalRow(from.fields.length)
buildCast[InternalRow](_, row => {
var i = 0
while (i < row.numFields) {
@@ -892,7 +892,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
val fieldsCasts = from.fields.zip(to.fields).map {
case (fromField, toField) => nullSafeCastFunction(fromField.dataType, toField.dataType, ctx)
}
- val rowClass = classOf[GenericMutableRow].getName
+ val rowClass = classOf[GenericInternalRow].getName
val result = ctx.freshName("result")
val tmpRow = ctx.freshName("tmpRow")
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
index ed894f6..7770684 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
@@ -123,6 +123,22 @@ class JoinedRow extends InternalRow {
override def anyNull: Boolean = row1.anyNull || row2.anyNull
+ override def setNullAt(i: Int): Unit = {
+ if (i < row1.numFields) {
+ row1.setNullAt(i)
+ } else {
+ row2.setNullAt(i - row1.numFields)
+ }
+ }
+
+ override def update(i: Int, value: Any): Unit = {
+ if (i < row1.numFields) {
+ row1.update(i, value)
+ } else {
+ row2.update(i - row1.numFields, value)
+ }
+ }
+
override def copy(): InternalRow = {
val copy1 = row1.copy()
val copy2 = row2.copy()
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index c8d1866..a81fa1c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -69,10 +69,10 @@ case class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mu
})
private[this] val exprArray = expressions.toArray
- private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.length)
+ private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length)
def currentValue: InternalRow = mutableRow
- override def target(row: MutableRow): MutableProjection = {
+ override def target(row: InternalRow): MutableProjection = {
mutableRow = row
this
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
new file mode 100644
index 0000000..74e0b46
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+/**
+ * A parent class for mutable container objects that are reused when the values are changed,
+ * resulting in less garbage. These values are held by a [[SpecificInternalRow]].
+ *
+ * The following code was roughly used to generate these objects:
+ * {{{
+ * val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",")
+ * types.map {tpe =>
+ * s"""
+ * final class Mutable$tpe extends MutableValue {
+ * var value: $tpe = 0
+ * def boxed = if (isNull) null else value
+ * def update(v: Any) = value = {
+ * isNull = false
+ * v.asInstanceOf[$tpe]
+ * }
+ * def copy() = {
+ * val newCopy = new Mutable$tpe
+ * newCopy.isNull = isNull
+ * newCopy.value = value
+ * newCopy
+ * }
+ * }"""
+ * }.foreach(println)
+ *
+ * types.map { tpe =>
+ * s"""
+ * override def set$tpe(ordinal: Int, value: $tpe): Unit = {
+ * val currentValue = values(ordinal).asInstanceOf[Mutable$tpe]
+ * currentValue.isNull = false
+ * currentValue.value = value
+ * }
+ *
+ * override def get$tpe(i: Int): $tpe = {
+ * values(i).asInstanceOf[Mutable$tpe].value
+ * }"""
+ * }.foreach(println)
+ * }}}
+ */
+abstract class MutableValue extends Serializable {
+ var isNull: Boolean = true
+ def boxed: Any
+ def update(v: Any): Unit
+ def copy(): MutableValue
+}
+
+final class MutableInt extends MutableValue {
+ var value: Int = 0
+ override def boxed: Any = if (isNull) null else value
+ override def update(v: Any): Unit = {
+ isNull = false
+ value = v.asInstanceOf[Int]
+ }
+ override def copy(): MutableInt = {
+ val newCopy = new MutableInt
+ newCopy.isNull = isNull
+ newCopy.value = value
+ newCopy
+ }
+}
+
+final class MutableFloat extends MutableValue {
+ var value: Float = 0
+ override def boxed: Any = if (isNull) null else value
+ override def update(v: Any): Unit = {
+ isNull = false
+ value = v.asInstanceOf[Float]
+ }
+ override def copy(): MutableFloat = {
+ val newCopy = new MutableFloat
+ newCopy.isNull = isNull
+ newCopy.value = value
+ newCopy
+ }
+}
+
+final class MutableBoolean extends MutableValue {
+ var value: Boolean = false
+ override def boxed: Any = if (isNull) null else value
+ override def update(v: Any): Unit = {
+ isNull = false
+ value = v.asInstanceOf[Boolean]
+ }
+ override def copy(): MutableBoolean = {
+ val newCopy = new MutableBoolean
+ newCopy.isNull = isNull
+ newCopy.value = value
+ newCopy
+ }
+}
+
+final class MutableDouble extends MutableValue {
+ var value: Double = 0
+ override def boxed: Any = if (isNull) null else value
+ override def update(v: Any): Unit = {
+ isNull = false
+ value = v.asInstanceOf[Double]
+ }
+ override def copy(): MutableDouble = {
+ val newCopy = new MutableDouble
+ newCopy.isNull = isNull
+ newCopy.value = value
+ newCopy
+ }
+}
+
+final class MutableShort extends MutableValue {
+ var value: Short = 0
+ override def boxed: Any = if (isNull) null else value
+ override def update(v: Any): Unit = value = {
+ isNull = false
+ v.asInstanceOf[Short]
+ }
+ override def copy(): MutableShort = {
+ val newCopy = new MutableShort
+ newCopy.isNull = isNull
+ newCopy.value = value
+ newCopy
+ }
+}
+
+final class MutableLong extends MutableValue {
+ var value: Long = 0
+ override def boxed: Any = if (isNull) null else value
+ override def update(v: Any): Unit = value = {
+ isNull = false
+ v.asInstanceOf[Long]
+ }
+ override def copy(): MutableLong = {
+ val newCopy = new MutableLong
+ newCopy.isNull = isNull
+ newCopy.value = value
+ newCopy
+ }
+}
+
+final class MutableByte extends MutableValue {
+ var value: Byte = 0
+ override def boxed: Any = if (isNull) null else value
+ override def update(v: Any): Unit = value = {
+ isNull = false
+ v.asInstanceOf[Byte]
+ }
+ override def copy(): MutableByte = {
+ val newCopy = new MutableByte
+ newCopy.isNull = isNull
+ newCopy.value = value
+ newCopy
+ }
+}
+
+final class MutableAny extends MutableValue {
+ var value: Any = _
+ override def boxed: Any = if (isNull) null else value
+ override def update(v: Any): Unit = {
+ isNull = false
+ value = v.asInstanceOf[Any]
+ }
+ override def copy(): MutableAny = {
+ val newCopy = new MutableAny
+ newCopy.isNull = isNull
+ newCopy.value = value
+ newCopy
+ }
+}
+
+/**
+ * A row type that holds an array specialized container objects, of type [[MutableValue]], chosen
+ * based on the dataTypes of each column. The intent is to decrease garbage when modifying the
+ * values of primitive columns.
+ */
+final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGenericInternalRow {
+
+ def this(dataTypes: Seq[DataType]) =
+ this(
+ dataTypes.map {
+ case BooleanType => new MutableBoolean
+ case ByteType => new MutableByte
+ case ShortType => new MutableShort
+ // We use INT for DATE internally
+ case IntegerType | DateType => new MutableInt
+ // We use Long for Timestamp internally
+ case LongType | TimestampType => new MutableLong
+ case FloatType => new MutableFloat
+ case DoubleType => new MutableDouble
+ case _ => new MutableAny
+ }.toArray)
+
+ def this() = this(Seq.empty)
+
+ def this(schema: StructType) = this(schema.fields.map(_.dataType))
+
+ override def numFields: Int = values.length
+
+ override def setNullAt(i: Int): Unit = {
+ values(i).isNull = true
+ }
+
+ override def isNullAt(i: Int): Boolean = values(i).isNull
+
+ override def copy(): InternalRow = {
+ val newValues = new Array[Any](values.length)
+ var i = 0
+ while (i < values.length) {
+ newValues(i) = values(i).boxed
+ i += 1
+ }
+
+ new GenericInternalRow(newValues)
+ }
+
+ override protected def genericGet(i: Int): Any = values(i).boxed
+
+ override def update(ordinal: Int, value: Any) {
+ if (value == null) {
+ setNullAt(ordinal)
+ } else {
+ values(ordinal).update(value)
+ }
+ }
+
+ override def setInt(ordinal: Int, value: Int): Unit = {
+ val currentValue = values(ordinal).asInstanceOf[MutableInt]
+ currentValue.isNull = false
+ currentValue.value = value
+ }
+
+ override def getInt(i: Int): Int = {
+ values(i).asInstanceOf[MutableInt].value
+ }
+
+ override def setFloat(ordinal: Int, value: Float): Unit = {
+ val currentValue = values(ordinal).asInstanceOf[MutableFloat]
+ currentValue.isNull = false
+ currentValue.value = value
+ }
+
+ override def getFloat(i: Int): Float = {
+ values(i).asInstanceOf[MutableFloat].value
+ }
+
+ override def setBoolean(ordinal: Int, value: Boolean): Unit = {
+ val currentValue = values(ordinal).asInstanceOf[MutableBoolean]
+ currentValue.isNull = false
+ currentValue.value = value
+ }
+
+ override def getBoolean(i: Int): Boolean = {
+ values(i).asInstanceOf[MutableBoolean].value
+ }
+
+ override def setDouble(ordinal: Int, value: Double): Unit = {
+ val currentValue = values(ordinal).asInstanceOf[MutableDouble]
+ currentValue.isNull = false
+ currentValue.value = value
+ }
+
+ override def getDouble(i: Int): Double = {
+ values(i).asInstanceOf[MutableDouble].value
+ }
+
+ override def setShort(ordinal: Int, value: Short): Unit = {
+ val currentValue = values(ordinal).asInstanceOf[MutableShort]
+ currentValue.isNull = false
+ currentValue.value = value
+ }
+
+ override def getShort(i: Int): Short = {
+ values(i).asInstanceOf[MutableShort].value
+ }
+
+ override def setLong(ordinal: Int, value: Long): Unit = {
+ val currentValue = values(ordinal).asInstanceOf[MutableLong]
+ currentValue.isNull = false
+ currentValue.value = value
+ }
+
+ override def getLong(i: Int): Long = {
+ values(i).asInstanceOf[MutableLong].value
+ }
+
+ override def setByte(ordinal: Int, value: Byte): Unit = {
+ val currentValue = values(ordinal).asInstanceOf[MutableByte]
+ currentValue.isNull = false
+ currentValue.value = value
+ }
+
+ override def getByte(i: Int): Byte = {
+ values(i).asInstanceOf[MutableByte].value
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
deleted file mode 100644
index 61ca727..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.expressions
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types._
-
-/**
- * A parent class for mutable container objects that are reused when the values are changed,
- * resulting in less garbage. These values are held by a [[SpecificMutableRow]].
- *
- * The following code was roughly used to generate these objects:
- * {{{
- * val types = "Int,Float,Boolean,Double,Short,Long,Byte,Any".split(",")
- * types.map {tpe =>
- * s"""
- * final class Mutable$tpe extends MutableValue {
- * var value: $tpe = 0
- * def boxed = if (isNull) null else value
- * def update(v: Any) = value = {
- * isNull = false
- * v.asInstanceOf[$tpe]
- * }
- * def copy() = {
- * val newCopy = new Mutable$tpe
- * newCopy.isNull = isNull
- * newCopy.value = value
- * newCopy
- * }
- * }"""
- * }.foreach(println)
- *
- * types.map { tpe =>
- * s"""
- * override def set$tpe(ordinal: Int, value: $tpe): Unit = {
- * val currentValue = values(ordinal).asInstanceOf[Mutable$tpe]
- * currentValue.isNull = false
- * currentValue.value = value
- * }
- *
- * override def get$tpe(i: Int): $tpe = {
- * values(i).asInstanceOf[Mutable$tpe].value
- * }"""
- * }.foreach(println)
- * }}}
- */
-abstract class MutableValue extends Serializable {
- var isNull: Boolean = true
- def boxed: Any
- def update(v: Any): Unit
- def copy(): MutableValue
-}
-
-final class MutableInt extends MutableValue {
- var value: Int = 0
- override def boxed: Any = if (isNull) null else value
- override def update(v: Any): Unit = {
- isNull = false
- value = v.asInstanceOf[Int]
- }
- override def copy(): MutableInt = {
- val newCopy = new MutableInt
- newCopy.isNull = isNull
- newCopy.value = value
- newCopy
- }
-}
-
-final class MutableFloat extends MutableValue {
- var value: Float = 0
- override def boxed: Any = if (isNull) null else value
- override def update(v: Any): Unit = {
- isNull = false
- value = v.asInstanceOf[Float]
- }
- override def copy(): MutableFloat = {
- val newCopy = new MutableFloat
- newCopy.isNull = isNull
- newCopy.value = value
- newCopy
- }
-}
-
-final class MutableBoolean extends MutableValue {
- var value: Boolean = false
- override def boxed: Any = if (isNull) null else value
- override def update(v: Any): Unit = {
- isNull = false
- value = v.asInstanceOf[Boolean]
- }
- override def copy(): MutableBoolean = {
- val newCopy = new MutableBoolean
- newCopy.isNull = isNull
- newCopy.value = value
- newCopy
- }
-}
-
-final class MutableDouble extends MutableValue {
- var value: Double = 0
- override def boxed: Any = if (isNull) null else value
- override def update(v: Any): Unit = {
- isNull = false
- value = v.asInstanceOf[Double]
- }
- override def copy(): MutableDouble = {
- val newCopy = new MutableDouble
- newCopy.isNull = isNull
- newCopy.value = value
- newCopy
- }
-}
-
-final class MutableShort extends MutableValue {
- var value: Short = 0
- override def boxed: Any = if (isNull) null else value
- override def update(v: Any): Unit = value = {
- isNull = false
- v.asInstanceOf[Short]
- }
- override def copy(): MutableShort = {
- val newCopy = new MutableShort
- newCopy.isNull = isNull
- newCopy.value = value
- newCopy
- }
-}
-
-final class MutableLong extends MutableValue {
- var value: Long = 0
- override def boxed: Any = if (isNull) null else value
- override def update(v: Any): Unit = value = {
- isNull = false
- v.asInstanceOf[Long]
- }
- override def copy(): MutableLong = {
- val newCopy = new MutableLong
- newCopy.isNull = isNull
- newCopy.value = value
- newCopy
- }
-}
-
-final class MutableByte extends MutableValue {
- var value: Byte = 0
- override def boxed: Any = if (isNull) null else value
- override def update(v: Any): Unit = value = {
- isNull = false
- v.asInstanceOf[Byte]
- }
- override def copy(): MutableByte = {
- val newCopy = new MutableByte
- newCopy.isNull = isNull
- newCopy.value = value
- newCopy
- }
-}
-
-final class MutableAny extends MutableValue {
- var value: Any = _
- override def boxed: Any = if (isNull) null else value
- override def update(v: Any): Unit = {
- isNull = false
- value = v.asInstanceOf[Any]
- }
- override def copy(): MutableAny = {
- val newCopy = new MutableAny
- newCopy.isNull = isNull
- newCopy.value = value
- newCopy
- }
-}
-
-/**
- * A row type that holds an array specialized container objects, of type [[MutableValue]], chosen
- * based on the dataTypes of each column. The intent is to decrease garbage when modifying the
- * values of primitive columns.
- */
-final class SpecificMutableRow(val values: Array[MutableValue])
- extends MutableRow with BaseGenericInternalRow {
-
- def this(dataTypes: Seq[DataType]) =
- this(
- dataTypes.map {
- case BooleanType => new MutableBoolean
- case ByteType => new MutableByte
- case ShortType => new MutableShort
- // We use INT for DATE internally
- case IntegerType | DateType => new MutableInt
- // We use Long for Timestamp internally
- case LongType | TimestampType => new MutableLong
- case FloatType => new MutableFloat
- case DoubleType => new MutableDouble
- case _ => new MutableAny
- }.toArray)
-
- def this() = this(Seq.empty)
-
- def this(schema: StructType) = this(schema.fields.map(_.dataType))
-
- override def numFields: Int = values.length
-
- override def setNullAt(i: Int): Unit = {
- values(i).isNull = true
- }
-
- override def isNullAt(i: Int): Boolean = values(i).isNull
-
- override def copy(): InternalRow = {
- val newValues = new Array[Any](values.length)
- var i = 0
- while (i < values.length) {
- newValues(i) = values(i).boxed
- i += 1
- }
-
- new GenericInternalRow(newValues)
- }
-
- override protected def genericGet(i: Int): Any = values(i).boxed
-
- override def update(ordinal: Int, value: Any) {
- if (value == null) {
- setNullAt(ordinal)
- } else {
- values(ordinal).update(value)
- }
- }
-
- override def setInt(ordinal: Int, value: Int): Unit = {
- val currentValue = values(ordinal).asInstanceOf[MutableInt]
- currentValue.isNull = false
- currentValue.value = value
- }
-
- override def getInt(i: Int): Int = {
- values(i).asInstanceOf[MutableInt].value
- }
-
- override def setFloat(ordinal: Int, value: Float): Unit = {
- val currentValue = values(ordinal).asInstanceOf[MutableFloat]
- currentValue.isNull = false
- currentValue.value = value
- }
-
- override def getFloat(i: Int): Float = {
- values(i).asInstanceOf[MutableFloat].value
- }
-
- override def setBoolean(ordinal: Int, value: Boolean): Unit = {
- val currentValue = values(ordinal).asInstanceOf[MutableBoolean]
- currentValue.isNull = false
- currentValue.value = value
- }
-
- override def getBoolean(i: Int): Boolean = {
- values(i).asInstanceOf[MutableBoolean].value
- }
-
- override def setDouble(ordinal: Int, value: Double): Unit = {
- val currentValue = values(ordinal).asInstanceOf[MutableDouble]
- currentValue.isNull = false
- currentValue.value = value
- }
-
- override def getDouble(i: Int): Double = {
- values(i).asInstanceOf[MutableDouble].value
- }
-
- override def setShort(ordinal: Int, value: Short): Unit = {
- val currentValue = values(ordinal).asInstanceOf[MutableShort]
- currentValue.isNull = false
- currentValue.value = value
- }
-
- override def getShort(i: Int): Short = {
- values(i).asInstanceOf[MutableShort].value
- }
-
- override def setLong(ordinal: Int, value: Long): Unit = {
- val currentValue = values(ordinal).asInstanceOf[MutableLong]
- currentValue.isNull = false
- currentValue.value = value
- }
-
- override def getLong(i: Int): Long = {
- values(i).asInstanceOf[MutableLong].value
- }
-
- override def setByte(ordinal: Int, value: Byte): Unit = {
- val currentValue = values(ordinal).asInstanceOf[MutableByte]
- currentValue.isNull = false
- currentValue.value = value
- }
-
- override def getByte(i: Int): Byte = {
- values(i).asInstanceOf[MutableByte].value
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
index 1d218da..83c8d40 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala
@@ -155,7 +155,7 @@ case class HyperLogLogPlusPlus(
aggBufferAttributes.map(_.newInstance())
/** Fill all words with zeros. */
- override def initialize(buffer: MutableRow): Unit = {
+ override def initialize(buffer: InternalRow): Unit = {
var word = 0
while (word < numWords) {
buffer.setLong(mutableAggBufferOffset + word, 0)
@@ -168,7 +168,7 @@ case class HyperLogLogPlusPlus(
*
* Variable names in the HLL++ paper match variable names in the code.
*/
- override def update(buffer: MutableRow, input: InternalRow): Unit = {
+ override def update(buffer: InternalRow, input: InternalRow): Unit = {
val v = child.eval(input)
if (v != null) {
// Create the hashed value 'x'.
@@ -200,7 +200,7 @@ case class HyperLogLogPlusPlus(
* Merge the HLL buffers by iterating through the registers in both buffers and select the
* maximum number of leading zeros for each register.
*/
- override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = {
+ override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
var idx = 0
var wordOffset = 0
while (wordOffset < numWords) {
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
index 16c03c5..0876060 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala
@@ -30,7 +30,7 @@ object PivotFirst {
// Currently UnsafeRow does not support the generic update method (throws
// UnsupportedOperationException), so we need to explicitly support each DataType.
- private val updateFunction: PartialFunction[DataType, (MutableRow, Int, Any) => Unit] = {
+ private val updateFunction: PartialFunction[DataType, (InternalRow, Int, Any) => Unit] = {
case DoubleType =>
(row, offset, value) => row.setDouble(offset, value.asInstanceOf[Double])
case IntegerType =>
@@ -89,9 +89,9 @@ case class PivotFirst(
val indexSize = pivotIndex.size
- private val updateRow: (MutableRow, Int, Any) => Unit = PivotFirst.updateFunction(valueDataType)
+ private val updateRow: (InternalRow, Int, Any) => Unit = PivotFirst.updateFunction(valueDataType)
- override def update(mutableAggBuffer: MutableRow, inputRow: InternalRow): Unit = {
+ override def update(mutableAggBuffer: InternalRow, inputRow: InternalRow): Unit = {
val pivotColValue = pivotColumn.eval(inputRow)
if (pivotColValue != null) {
// We ignore rows whose pivot column value is not in the list of pivot column values.
@@ -105,7 +105,7 @@ case class PivotFirst(
}
}
- override def merge(mutableAggBuffer: MutableRow, inputAggBuffer: InternalRow): Unit = {
+ override def merge(mutableAggBuffer: InternalRow, inputAggBuffer: InternalRow): Unit = {
for (i <- 0 until indexSize) {
if (!inputAggBuffer.isNullAt(inputAggBufferOffset + i)) {
val value = inputAggBuffer.get(inputAggBufferOffset + i, valueDataType)
@@ -114,7 +114,7 @@ case class PivotFirst(
}
}
- override def initialize(mutableAggBuffer: MutableRow): Unit = valueDataType match {
+ override def initialize(mutableAggBuffer: InternalRow): Unit = valueDataType match {
case d: DecimalType =>
// Per doc of setDecimal we need to do this instead of setNullAt for DecimalType.
for (i <- 0 until indexSize) {
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
index 78a388d..89eb864 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
@@ -60,11 +60,11 @@ abstract class Collect extends ImperativeAggregate {
protected[this] val buffer: Growable[Any] with Iterable[Any]
- override def initialize(b: MutableRow): Unit = {
+ override def initialize(b: InternalRow): Unit = {
buffer.clear()
}
- override def update(b: MutableRow, input: InternalRow): Unit = {
+ override def update(b: InternalRow, input: InternalRow): Unit = {
// Do not allow null values. We follow the semantics of Hive's collect_list/collect_set here.
// See: org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMkCollectionEvaluator
val value = child.eval(input)
@@ -73,7 +73,7 @@ abstract class Collect extends ImperativeAggregate {
}
}
- override def merge(buffer: MutableRow, input: InternalRow): Unit = {
+ override def merge(buffer: InternalRow, input: InternalRow): Unit = {
sys.error("Collect cannot be used in partial aggregations.")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index b5c0844..f3fd58b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -307,14 +307,14 @@ abstract class ImperativeAggregate extends AggregateFunction with CodegenFallbac
*
* Use `fieldNumber + mutableAggBufferOffset` to access fields of `mutableAggBuffer`.
*/
- def initialize(mutableAggBuffer: MutableRow): Unit
+ def initialize(mutableAggBuffer: InternalRow): Unit
/**
* Updates its aggregation buffer, located in `mutableAggBuffer`, based on the given `inputRow`.
*
* Use `fieldNumber + mutableAggBufferOffset` to access fields of `mutableAggBuffer`.
*/
- def update(mutableAggBuffer: MutableRow, inputRow: InternalRow): Unit
+ def update(mutableAggBuffer: InternalRow, inputRow: InternalRow): Unit
/**
* Combines new intermediate results from the `inputAggBuffer` with the existing intermediate
@@ -323,7 +323,7 @@ abstract class ImperativeAggregate extends AggregateFunction with CodegenFallbac
* Use `fieldNumber + mutableAggBufferOffset` to access fields of `mutableAggBuffer`.
* Use `fieldNumber + inputAggBufferOffset` to access fields of `inputAggBuffer`.
*/
- def merge(mutableAggBuffer: MutableRow, inputAggBuffer: InternalRow): Unit
+ def merge(mutableAggBuffer: InternalRow, inputAggBuffer: InternalRow): Unit
}
/**
@@ -504,16 +504,16 @@ abstract class TypedImperativeAggregate[T] extends ImperativeAggregate {
/** De-serializes the serialized format Array[Byte], and produces aggregation buffer object T */
def deserialize(storageFormat: Array[Byte]): T
- final override def initialize(buffer: MutableRow): Unit = {
+ final override def initialize(buffer: InternalRow): Unit = {
val bufferObject = createAggregationBuffer()
buffer.update(mutableAggBufferOffset, bufferObject)
}
- final override def update(buffer: MutableRow, input: InternalRow): Unit = {
+ final override def update(buffer: InternalRow, input: InternalRow): Unit = {
update(getBufferObject(buffer), input)
}
- final override def merge(buffer: MutableRow, inputBuffer: InternalRow): Unit = {
+ final override def merge(buffer: InternalRow, inputBuffer: InternalRow): Unit = {
val bufferObject = getBufferObject(buffer)
// The inputBuffer stores serialized aggregation buffer object produced by partial aggregate
val inputObject = deserialize(inputBuffer.getBinary(inputAggBufferOffset))
@@ -547,7 +547,7 @@ abstract class TypedImperativeAggregate[T] extends ImperativeAggregate {
* This is only called when doing Partial or PartialMerge mode aggregation, before the framework
* shuffle out aggregate buffers.
*/
- final def serializeAggregateBufferInPlace(buffer: MutableRow): Unit = {
+ final def serializeAggregateBufferInPlace(buffer: InternalRow): Unit = {
buffer(mutableAggBufferOffset) = serialize(getBufferObject(buffer))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 574943d..6cab50a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -819,7 +819,7 @@ class CodeAndComment(val body: String, val comment: collection.Map[String, Strin
*/
abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Logging {
- protected val genericMutableRowType: String = classOf[GenericMutableRow].getName
+ protected val genericMutableRowType: String = classOf[GenericInternalRow].getName
/**
* Generates a class for a given input expression. Called when there is not cached code
@@ -889,7 +889,6 @@ object CodeGenerator extends Logging {
classOf[UnsafeArrayData].getName,
classOf[MapData].getName,
classOf[UnsafeMapData].getName,
- classOf[MutableRow].getName,
classOf[Expression].getName
))
evaluator.setExtendedClass(classOf[GeneratedClass])
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index 13d61af..5c4b56b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -24,10 +24,10 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
abstract class BaseMutableProjection extends MutableProjection
/**
- * Generates byte code that produces a [[MutableRow]] object that can update itself based on a new
+ * Generates byte code that produces a [[InternalRow]] object that can update itself based on a new
* input [[InternalRow]] for a fixed set of [[Expression Expressions]].
* It exposes a `target` method, which is used to set the row that will be updated.
- * The internal [[MutableRow]] object created internally is used only when `target` is not used.
+ * The internal [[InternalRow]] object created internally is used only when `target` is not used.
*/
object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableProjection] {
@@ -102,7 +102,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
class SpecificMutableProjection extends ${classOf[BaseMutableProjection].getName} {
private Object[] references;
- private MutableRow mutableRow;
+ private InternalRow mutableRow;
${ctx.declareMutableStates()}
public SpecificMutableProjection(Object[] references) {
@@ -113,7 +113,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
${ctx.declareAddedFunctions()}
- public ${classOf[BaseMutableProjection].getName} target(MutableRow row) {
+ public ${classOf[BaseMutableProjection].getName} target(InternalRow row) {
mutableRow = row;
return this;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
index 1c98c9e..2773e1a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types._
abstract class BaseProjection extends Projection {}
/**
- * Generates byte code that produces a [[MutableRow]] object (not an [[UnsafeRow]]) that can update
+ * Generates byte code that produces a [[InternalRow]] object (not an [[UnsafeRow]]) that can update
* itself based on a new input [[InternalRow]] for a fixed set of [[Expression Expressions]].
*/
object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] {
@@ -164,12 +164,12 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
class SpecificSafeProjection extends ${classOf[BaseProjection].getName} {
private Object[] references;
- private MutableRow mutableRow;
+ private InternalRow mutableRow;
${ctx.declareMutableStates()}
public SpecificSafeProjection(Object[] references) {
this.references = references;
- mutableRow = (MutableRow) references[references.length - 1];
+ mutableRow = (InternalRow) references[references.length - 1];
${ctx.initMutableStates()}
}
@@ -188,7 +188,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")
val c = CodeGenerator.compile(code)
- val resultRow = new SpecificMutableRow(expressions.map(_.dataType))
+ val resultRow = new SpecificInternalRow(expressions.map(_.dataType))
c.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection]
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index a6125c6..1510a47 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -81,7 +81,7 @@ package object expressions {
def currentValue: InternalRow
/** Uses the given row to store the output of the projection. */
- def target(row: MutableRow): MutableProjection
+ def target(row: InternalRow): MutableProjection
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
index 73dceb3..751b821 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
@@ -158,33 +158,6 @@ trait BaseGenericInternalRow extends InternalRow {
}
/**
- * An extended interface to [[InternalRow]] that allows the values for each column to be updated.
- * Setting a value through a primitive function implicitly marks that column as not null.
- */
-abstract class MutableRow extends InternalRow {
- def setNullAt(i: Int): Unit
-
- def update(i: Int, value: Any): Unit
-
- // default implementation (slow)
- def setBoolean(i: Int, value: Boolean): Unit = { update(i, value) }
- def setByte(i: Int, value: Byte): Unit = { update(i, value) }
- def setShort(i: Int, value: Short): Unit = { update(i, value) }
- def setInt(i: Int, value: Int): Unit = { update(i, value) }
- def setLong(i: Int, value: Long): Unit = { update(i, value) }
- def setFloat(i: Int, value: Float): Unit = { update(i, value) }
- def setDouble(i: Int, value: Double): Unit = { update(i, value) }
-
- /**
- * Update the decimal column at `i`.
- *
- * Note: In order to support update decimal with precision > 18 in UnsafeRow,
- * CAN NOT call setNullAt() for decimal column on UnsafeRow, call setDecimal(i, null, precision).
- */
- def setDecimal(i: Int, value: Decimal, precision: Int) { update(i, value) }
-}
-
-/**
* A row implementation that uses an array of objects as the underlying storage. Note that, while
* the array is not copied, and thus could technically be mutated after creation, this is not
* allowed.
@@ -230,24 +203,9 @@ class GenericInternalRow(val values: Array[Any]) extends BaseGenericInternalRow
override def numFields: Int = values.length
- override def copy(): GenericInternalRow = this
-}
-
-class GenericMutableRow(values: Array[Any]) extends MutableRow with BaseGenericInternalRow {
- /** No-arg constructor for serialization. */
- protected def this() = this(null)
-
- def this(size: Int) = this(new Array[Any](size))
-
- override protected def genericGet(ordinal: Int) = values(ordinal)
-
- override def toSeq(fieldTypes: Seq[DataType]): Seq[Any] = values
-
- override def numFields: Int = values.length
-
override def setNullAt(i: Int): Unit = { values(i) = null}
override def update(i: Int, value: Any): Unit = { values(i) = value }
- override def copy(): InternalRow = new GenericInternalRow(values.clone())
+ override def copy(): GenericInternalRow = this
}
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index f80e637..e476cb1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -105,7 +105,7 @@ class JacksonParser(
}
emptyRow
} else {
- val row = new GenericMutableRow(schema.length)
+ val row = new GenericInternalRow(schema.length)
for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
require(schema(corruptIndex).dataType == StringType)
row.update(corruptIndex, UTF8String.fromString(record))
@@ -363,7 +363,7 @@ class JacksonParser(
parser: JsonParser,
schema: StructType,
fieldConverters: Seq[ValueConverter]): InternalRow = {
- val row = new GenericMutableRow(schema.length)
+ val row = new GenericInternalRow(schema.length)
while (nextUntil(parser, JsonToken.END_OBJECT)) {
schema.getFieldIndex(parser.getCurrentName) match {
case Some(index) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index 85563dd..43b6afd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -23,7 +23,7 @@ import java.sql.{Date, Timestamp}
import scala.reflect.runtime.universe.typeOf
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, Literal, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Literal, SpecificInternalRow}
import org.apache.spark.sql.catalyst.expressions.objects.NewInstance
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -94,7 +94,7 @@ object TestingUDT {
.add("c", DoubleType, nullable = false)
override def serialize(n: NestedStruct): Any = {
- val row = new SpecificMutableRow(sqlType.asInstanceOf[StructType].map(_.dataType))
+ val row = new SpecificInternalRow(sqlType.asInstanceOf[StructType].map(_.dataType))
row.setInt(0, n.a)
row.setLong(1, n.b)
row.setDouble(2, n.c)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index 5588b44..0cb201e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -68,7 +68,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
val length = 5000
val expressions = List.fill(length)(EqualTo(Literal(1), Literal(1)))
val plan = GenerateMutableProjection.generate(expressions)
- val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+ val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq.fill(length)(true)
if (!checkResult(actual, expected)) {
@@ -91,7 +91,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
val expression = CaseWhen((1 to cases).map(generateCase(_)))
val plan = GenerateMutableProjection.generate(Seq(expression))
- val input = new GenericMutableRow(Array[Any](UTF8String.fromString(s"${clauses}:${cases}")))
+ val input = new GenericInternalRow(Array[Any](UTF8String.fromString(s"${clauses}:${cases}")))
val actual = plan(input).toSeq(Seq(expression.dataType))
assert(actual(0) == cases)
@@ -101,7 +101,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
val length = 5000
val expressions = Seq(CreateArray(List.fill(length)(EqualTo(Literal(1), Literal(1)))))
val plan = GenerateMutableProjection.generate(expressions)
- val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+ val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq(new GenericArrayData(Seq.fill(length)(true)))
if (!checkResult(actual, expected)) {
@@ -116,7 +116,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
case (expr, i) => Seq(Literal(i), expr)
}))
val plan = GenerateMutableProjection.generate(expressions)
- val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType)).map {
+ val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)).map {
case m: ArrayBasedMapData => ArrayBasedMapData.toScalaMap(m)
}
val expected = (0 until length).map((_, true)).toMap :: Nil
@@ -130,7 +130,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
val length = 5000
val expressions = Seq(CreateStruct(List.fill(length)(EqualTo(Literal(1), Literal(1)))))
val plan = GenerateMutableProjection.generate(expressions)
- val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+ val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq(InternalRow(Seq.fill(length)(true): _*))
if (!checkResult(actual, expected)) {
@@ -145,7 +145,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
expr => Seq(Literal(expr.toString), expr)
}))
val plan = GenerateMutableProjection.generate(expressions)
- val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+ val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq(InternalRow(Seq.fill(length)(true): _*))
if (!checkResult(actual, expected)) {
@@ -158,7 +158,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
val schema = StructType(Seq.fill(length)(StructField("int", IntegerType)))
val expressions = Seq(CreateExternalRow(Seq.fill(length)(Literal(1)), schema))
val plan = GenerateMutableProjection.generate(expressions)
- val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+ val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq(Row.fromSeq(Seq.fill(length)(1)))
if (!checkResult(actual, expected)) {
@@ -174,7 +174,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
Literal.create("PST", StringType))
}
val plan = GenerateMutableProjection.generate(expressions)
- val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
+ val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq.fill(length)(
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00")))
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
index 0f1264c..25a675a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala
@@ -45,7 +45,7 @@ class MapDataSuite extends SparkFunSuite {
// UnsafeMapData
val unsafeConverter = UnsafeProjection.create(Array[DataType](MapType(StringType, IntegerType)))
- val row = new GenericMutableRow(1)
+ val row = new GenericInternalRow(1)
def toUnsafeMap(map: ArrayBasedMapData): UnsafeMapData = {
row.update(0, map)
val unsafeRow = unsafeConverter.apply(row)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
index 90790dd..cf3cbe2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
@@ -37,7 +37,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val fieldTypes: Array[DataType] = Array(LongType, LongType, IntegerType)
val converter = UnsafeProjection.create(fieldTypes)
- val row = new SpecificMutableRow(fieldTypes)
+ val row = new SpecificInternalRow(fieldTypes)
row.setLong(0, 0)
row.setLong(1, 1)
row.setInt(2, 2)
@@ -75,7 +75,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val fieldTypes: Array[DataType] = Array(LongType, StringType, BinaryType)
val converter = UnsafeProjection.create(fieldTypes)
- val row = new SpecificMutableRow(fieldTypes)
+ val row = new SpecificInternalRow(fieldTypes)
row.setLong(0, 0)
row.update(1, UTF8String.fromString("Hello"))
row.update(2, "World".getBytes(StandardCharsets.UTF_8))
@@ -94,7 +94,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val fieldTypes: Array[DataType] = Array(LongType, StringType, DateType, TimestampType)
val converter = UnsafeProjection.create(fieldTypes)
- val row = new SpecificMutableRow(fieldTypes)
+ val row = new SpecificInternalRow(fieldTypes)
row.setLong(0, 0)
row.update(1, UTF8String.fromString("Hello"))
row.update(2, DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01")))
@@ -138,7 +138,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val converter = UnsafeProjection.create(fieldTypes)
val rowWithAllNullColumns: InternalRow = {
- val r = new SpecificMutableRow(fieldTypes)
+ val r = new SpecificInternalRow(fieldTypes)
for (i <- fieldTypes.indices) {
r.setNullAt(i)
}
@@ -167,7 +167,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
// columns, then the serialized row representation should be identical to what we would get by
// creating an entirely null row via the converter
val rowWithNoNullColumns: InternalRow = {
- val r = new SpecificMutableRow(fieldTypes)
+ val r = new SpecificInternalRow(fieldTypes)
r.setNullAt(0)
r.setBoolean(1, false)
r.setByte(2, 20)
@@ -243,11 +243,11 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
test("NaN canonicalization") {
val fieldTypes: Array[DataType] = Array(FloatType, DoubleType)
- val row1 = new SpecificMutableRow(fieldTypes)
+ val row1 = new SpecificInternalRow(fieldTypes)
row1.setFloat(0, java.lang.Float.intBitsToFloat(0x7f800001))
row1.setDouble(1, java.lang.Double.longBitsToDouble(0x7ff0000000000001L))
- val row2 = new SpecificMutableRow(fieldTypes)
+ val row2 = new SpecificInternalRow(fieldTypes)
row2.setFloat(0, java.lang.Float.intBitsToFloat(0x7fffffff))
row2.setDouble(1, java.lang.Double.longBitsToDouble(0x7fffffffffffffffL))
@@ -263,7 +263,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val converter = UnsafeProjection.create(fieldTypes)
- val row = new GenericMutableRow(fieldTypes.length)
+ val row = new GenericInternalRow(fieldTypes.length)
row.update(0, InternalRow(1))
row.update(1, InternalRow(InternalRow(2L)))
@@ -324,7 +324,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
)
val converter = UnsafeProjection.create(fieldTypes)
- val row = new GenericMutableRow(fieldTypes.length)
+ val row = new GenericInternalRow(fieldTypes.length)
row.update(0, createArray(1, 2))
row.update(1, createArray(createArray(3, 4)))
@@ -359,7 +359,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val innerMap = createMap(5, 6)(7, 8)
val map2 = createMap(9)(innerMap)
- val row = new GenericMutableRow(fieldTypes.length)
+ val row = new GenericInternalRow(fieldTypes.length)
row.update(0, map1)
row.update(1, map2)
@@ -400,7 +400,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
)
val converter = UnsafeProjection.create(fieldTypes)
- val row = new GenericMutableRow(fieldTypes.length)
+ val row = new GenericInternalRow(fieldTypes.length)
row.update(0, InternalRow(createArray(1)))
row.update(1, createArray(InternalRow(2L)))
@@ -439,7 +439,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
)
val converter = UnsafeProjection.create(fieldTypes)
- val row = new GenericMutableRow(fieldTypes.length)
+ val row = new GenericInternalRow(fieldTypes.length)
row.update(0, InternalRow(createMap(1)(2)))
row.update(1, createMap(3)(InternalRow(4L)))
@@ -485,7 +485,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
)
val converter = UnsafeProjection.create(fieldTypes)
- val row = new GenericMutableRow(fieldTypes.length)
+ val row = new GenericInternalRow(fieldTypes.length)
row.update(0, createArray(createMap(1)(2)))
row.update(1, createMap(3)(createArray(4)))
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
index 61298a1..8456e24 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentileSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedAttribu
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, BoundReference, Cast, CreateArray, DecimalLiteral, GenericMutableRow, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, BoundReference, Cast, CreateArray, DecimalLiteral, GenericInternalRow, Literal}
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.{PercentileDigest, PercentileDigestSerializer}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.util.ArrayData
@@ -144,7 +144,8 @@ class ApproximatePercentileSuite extends SparkFunSuite {
.withNewInputAggBufferOffset(inputAggregationBufferOffset)
.withNewMutableAggBufferOffset(mutableAggregationBufferOffset)
- val mutableAggBuffer = new GenericMutableRow(new Array[Any](mutableAggregationBufferOffset + 1))
+ val mutableAggBuffer = new GenericInternalRow(
+ new Array[Any](mutableAggregationBufferOffset + 1))
agg.initialize(mutableAggBuffer)
val dataCount = 10
(1 to dataCount).foreach { data =>
@@ -154,7 +155,7 @@ class ApproximatePercentileSuite extends SparkFunSuite {
// Serialize the aggregation buffer
val serialized = mutableAggBuffer.getBinary(mutableAggregationBufferOffset)
- val inputAggBuffer = new GenericMutableRow(Array[Any](null, serialized))
+ val inputAggBuffer = new GenericInternalRow(Array[Any](null, serialized))
// Phase 2: final mode aggregation
// Re-initialize the aggregation buffer
@@ -311,7 +312,7 @@ class ApproximatePercentileSuite extends SparkFunSuite {
test("class ApproximatePercentile, null handling") {
val childExpression = Cast(BoundReference(0, IntegerType, nullable = true), DoubleType)
val agg = new ApproximatePercentile(childExpression, Literal(0.5D))
- val buffer = new GenericMutableRow(new Array[Any](1))
+ val buffer = new GenericInternalRow(new Array[Any](1))
agg.initialize(buffer)
// Empty aggregation buffer
assert(agg.eval(buffer) == null)
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
index f537422..17f6b71 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlusSuite.scala
@@ -22,28 +22,29 @@ import java.util.Random
import scala.collection.mutable
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, MutableRow, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, SpecificInternalRow}
import org.apache.spark.sql.types.{DataType, IntegerType}
class HyperLogLogPlusPlusSuite extends SparkFunSuite {
/** Create a HLL++ instance and an input and output buffer. */
def createEstimator(rsd: Double, dt: DataType = IntegerType):
- (HyperLogLogPlusPlus, MutableRow, MutableRow) = {
- val input = new SpecificMutableRow(Seq(dt))
+ (HyperLogLogPlusPlus, InternalRow, InternalRow) = {
+ val input = new SpecificInternalRow(Seq(dt))
val hll = new HyperLogLogPlusPlus(new BoundReference(0, dt, true), rsd)
val buffer = createBuffer(hll)
(hll, input, buffer)
}
- def createBuffer(hll: HyperLogLogPlusPlus): MutableRow = {
- val buffer = new SpecificMutableRow(hll.aggBufferAttributes.map(_.dataType))
+ def createBuffer(hll: HyperLogLogPlusPlus): InternalRow = {
+ val buffer = new SpecificInternalRow(hll.aggBufferAttributes.map(_.dataType))
hll.initialize(buffer)
buffer
}
/** Evaluate the estimate. It should be within 3*SD's of the given true rsd. */
- def evaluateEstimate(hll: HyperLogLogPlusPlus, buffer: MutableRow, cardinality: Int): Unit = {
+ def evaluateEstimate(hll: HyperLogLogPlusPlus, buffer: InternalRow, cardinality: Int): Unit = {
val estimate = hll.eval(buffer).asInstanceOf[Long].toDouble
val error = math.abs((estimate / cardinality.toDouble) - 1.0d)
assert(error < hll.trueRsd * 3.0d, "Error should be within 3 std. errors.")
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 62abc2a..a6ce4c2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -21,8 +21,7 @@ import java.util.*;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
-import org.apache.spark.sql.catalyst.expressions.MutableRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
@@ -91,7 +90,7 @@ public final class ColumnarBatch {
* Adapter class to interop with existing components that expect internal row. A lot of
* performance is lost with this translation.
*/
- public static final class Row extends MutableRow {
+ public static final class Row extends InternalRow {
protected int rowId;
private final ColumnarBatch parent;
private final int fixedLenRowSize;
@@ -129,7 +128,7 @@ public final class ColumnarBatch {
* Revisit this. This is expensive. This is currently only used in test paths.
*/
public InternalRow copy() {
- GenericMutableRow row = new GenericMutableRow(columns.length);
+ GenericInternalRow row = new GenericInternalRow(columns.length);
for (int i = 0; i < numFields(); i++) {
if (isNullAt(i)) {
row.setNullAt(i);
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 6c4248c..d3a2222 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -32,7 +32,7 @@ object RDDConversions {
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
data.mapPartitions { iterator =>
val numColumns = outputTypes.length
- val mutableRow = new GenericMutableRow(numColumns)
+ val mutableRow = new GenericInternalRow(numColumns)
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
iterator.map { r =>
var i = 0
@@ -52,7 +52,7 @@ object RDDConversions {
def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = {
data.mapPartitions { iterator =>
val numColumns = outputTypes.length
- val mutableRow = new GenericMutableRow(numColumns)
+ val mutableRow = new GenericInternalRow(numColumns)
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
iterator.map { r =>
var i = 0
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
index f335912..7c11fdb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
@@ -153,7 +153,7 @@ abstract class AggregationIterator(
protected def generateProcessRow(
expressions: Seq[AggregateExpression],
functions: Seq[AggregateFunction],
- inputAttributes: Seq[Attribute]): (MutableRow, InternalRow) => Unit = {
+ inputAttributes: Seq[Attribute]): (InternalRow, InternalRow) => Unit = {
val joinedRow = new JoinedRow
if (expressions.nonEmpty) {
val mergeExpressions = functions.zipWithIndex.flatMap {
@@ -168,9 +168,9 @@ abstract class AggregationIterator(
case (ae: ImperativeAggregate, i) =>
expressions(i).mode match {
case Partial | Complete =>
- (buffer: MutableRow, row: InternalRow) => ae.update(buffer, row)
+ (buffer: InternalRow, row: InternalRow) => ae.update(buffer, row)
case PartialMerge | Final =>
- (buffer: MutableRow, row: InternalRow) => ae.merge(buffer, row)
+ (buffer: InternalRow, row: InternalRow) => ae.merge(buffer, row)
}
}.toArray
// This projection is used to merge buffer values for all expression-based aggregates.
@@ -178,7 +178,7 @@ abstract class AggregationIterator(
val updateProjection =
newMutableProjection(mergeExpressions, aggregationBufferSchema ++ inputAttributes)
- (currentBuffer: MutableRow, row: InternalRow) => {
+ (currentBuffer: InternalRow, row: InternalRow) => {
// Process all expression-based aggregate functions.
updateProjection.target(currentBuffer)(joinedRow(currentBuffer, row))
// Process all imperative aggregate functions.
@@ -190,11 +190,11 @@ abstract class AggregationIterator(
}
} else {
// Grouping only.
- (currentBuffer: MutableRow, row: InternalRow) => {}
+ (currentBuffer: InternalRow, row: InternalRow) => {}
}
}
- protected val processRow: (MutableRow, InternalRow) => Unit =
+ protected val processRow: (InternalRow, InternalRow) => Unit =
generateProcessRow(aggregateExpressions, aggregateFunctions, inputAttributes)
protected val groupingProjection: UnsafeProjection =
@@ -202,7 +202,7 @@ abstract class AggregationIterator(
protected val groupingAttributes = groupingExpressions.map(_.toAttribute)
// Initializing the function used to generate the output row.
- protected def generateResultProjection(): (UnsafeRow, MutableRow) => UnsafeRow = {
+ protected def generateResultProjection(): (UnsafeRow, InternalRow) => UnsafeRow = {
val joinedRow = new JoinedRow
val modes = aggregateExpressions.map(_.mode).distinct
val bufferAttributes = aggregateFunctions.flatMap(_.aggBufferAttributes)
@@ -211,14 +211,14 @@ abstract class AggregationIterator(
case ae: DeclarativeAggregate => ae.evaluateExpression
case agg: AggregateFunction => NoOp
}
- val aggregateResult = new SpecificMutableRow(aggregateAttributes.map(_.dataType))
+ val aggregateResult = new SpecificInternalRow(aggregateAttributes.map(_.dataType))
val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferAttributes)
expressionAggEvalProjection.target(aggregateResult)
val resultProjection =
UnsafeProjection.create(resultExpressions, groupingAttributes ++ aggregateAttributes)
- (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+ (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
// Generate results for all expression-based aggregate functions.
expressionAggEvalProjection(currentBuffer)
// Generate results for all imperative aggregate functions.
@@ -244,7 +244,7 @@ abstract class AggregationIterator(
}
}
- (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+ (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
// Serializes the generic object stored in aggregation buffer
var i = 0
while (i < typedImperativeAggregates.length) {
@@ -256,17 +256,17 @@ abstract class AggregationIterator(
} else {
// Grouping-only: we only output values based on grouping expressions.
val resultProjection = UnsafeProjection.create(resultExpressions, groupingAttributes)
- (currentGroupingKey: UnsafeRow, currentBuffer: MutableRow) => {
+ (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
resultProjection(currentGroupingKey)
}
}
}
- protected val generateOutput: (UnsafeRow, MutableRow) => UnsafeRow =
+ protected val generateOutput: (UnsafeRow, InternalRow) => UnsafeRow =
generateResultProjection()
/** Initializes buffer values for all aggregate functions. */
- protected def initializeBuffer(buffer: MutableRow): Unit = {
+ protected def initializeBuffer(buffer: InternalRow): Unit = {
expressionAggInitialProjection.target(buffer)(EmptyRow)
var i = 0
while (i < allImperativeAggregateFunctions.length) {
http://git-wip-us.apache.org/repos/asf/spark/blob/97594c29/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index c2b1ef0..bea2dce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -49,11 +49,11 @@ class SortBasedAggregationIterator(
* Creates a new aggregation buffer and initializes buffer values
* for all aggregate functions.
*/
- private def newBuffer: MutableRow = {
+ private def newBuffer: InternalRow = {
val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
val bufferRowSize: Int = bufferSchema.length
- val genericMutableBuffer = new GenericMutableRow(bufferRowSize)
+ val genericMutableBuffer = new GenericInternalRow(bufferRowSize)
val useUnsafeBuffer = bufferSchema.map(_.dataType).forall(UnsafeRow.isMutable)
val buffer = if (useUnsafeBuffer) {
@@ -84,7 +84,7 @@ class SortBasedAggregationIterator(
private[this] var sortedInputHasNewGroup: Boolean = false
// The aggregation buffer used by the sort-based aggregation.
- private[this] val sortBasedAggregationBuffer: MutableRow = newBuffer
+ private[this] val sortBasedAggregationBuffer: InternalRow = newBuffer
// This safe projection is used to turn the input row into safe row. This is necessary
// because the input row may be produced by unsafe projection in child operator and all the
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org