You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/06/13 08:06:40 UTC
[3/5] spark git commit: [SPARK-7186] [SQL] Decouple internal Row from
external Row
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index b44d4c8..1828ed1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -245,7 +245,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(jsonRDD, 1.0, columnNameOfCorruptJsonRecord)))
val rowRDD = JsonRDD.jsonStringToRow(jsonRDD, appliedSchema, columnNameOfCorruptJsonRecord)
- sqlContext.createDataFrame(rowRDD, appliedSchema, needsConversion = false)
+ sqlContext.internalCreateDataFrame(rowRDD, appliedSchema)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 5f758ad..22d0e50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -31,7 +31,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst._
+import org.apache.spark.sql.catalyst.{InternalRow, _}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.errors.DialectException
@@ -486,15 +486,27 @@ class SQLContext(@transient val sparkContext: SparkContext)
// schema differs from the existing schema on any field data type.
val catalystRows = if (needsConversion) {
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
- rowRDD.map(converter(_).asInstanceOf[Row])
+ rowRDD.map(converter(_).asInstanceOf[InternalRow])
} else {
- rowRDD
+ rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
}
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
DataFrame(this, logicalPlan)
}
/**
+ * Creates a DataFrame from an RDD[Row]. User can specify whether the input rows should be
+ * converted to Catalyst rows.
+ */
+ private[sql]
+ def internalCreateDataFrame(catalystRows: RDD[InternalRow], schema: StructType) = {
+ // TODO: use MutableProjection when rowRDD is another DataFrame and the applied
+ // schema differs from the existing schema on any field data type.
+ val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
+ DataFrame(this, logicalPlan)
+ }
+
+ /**
* :: DeveloperApi ::
* Creates a [[DataFrame]] from an [[JavaRDD]] containing [[Row]]s using the given schema.
* It is important to make sure that the structure of every [[Row]] of the provided RDD matches
@@ -531,7 +543,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
extractors.zip(attributeSeq).map { case (e, attr) =>
CatalystTypeConverters.convertToCatalyst(e.invoke(row), attr.dataType)
}.toArray[Any]
- ) : Row
+ ) : InternalRow
}
}
DataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this))
@@ -886,7 +898,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] val planner = new SparkPlanner
@transient
- protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[Row], 1)
+ protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1)
/**
* Prepares a planned SparkPlan for execution by inserting shuffle operations as needed.
@@ -953,7 +965,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */
- lazy val toRdd: RDD[Row] = executedPlan.execute()
+ lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
@@ -1035,7 +1047,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
val rowRdd = convertedRdd.mapPartitions { iter =>
- iter.map { m => new GenericRow(m): Row}
+ iter.map { m => new GenericRow(m): InternalRow}
}
DataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index aa10af4..cc7506d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -18,8 +18,7 @@
package org.apache.spark.sql.columnar
import java.nio.{ByteBuffer, ByteOrder}
-
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.columnar.ColumnBuilder._
import org.apache.spark.sql.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder}
import org.apache.spark.sql.types._
@@ -33,7 +32,7 @@ private[sql] trait ColumnBuilder {
/**
* Appends `row(ordinal)` to the column builder.
*/
- def appendFrom(row: Row, ordinal: Int)
+ def appendFrom(row: InternalRow, ordinal: Int)
/**
* Column statistics information
@@ -68,7 +67,7 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType](
buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)
}
- override def appendFrom(row: Row, ordinal: Int): Unit = {
+ override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
buffer = ensureFreeSpace(buffer, columnType.actualSize(row, ordinal))
columnType.append(row, ordinal, buffer)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index 11c79c8..1bce214 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.columnar
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -53,7 +53,7 @@ private[sql] sealed trait ColumnStats extends Serializable {
/**
* Gathers statistics information from `row(ordinal)`.
*/
- def gatherStats(row: Row, ordinal: Int): Unit = {
+ def gatherStats(row: InternalRow, ordinal: Int): Unit = {
if (row.isNullAt(ordinal)) {
nullCount += 1
// 4 bytes for null position
@@ -66,23 +66,23 @@ private[sql] sealed trait ColumnStats extends Serializable {
* Column statistics represented as a single row, currently including closed lower bound, closed
* upper bound and null count.
*/
- def collectedStatistics: Row
+ def collectedStatistics: InternalRow
}
/**
* A no-op ColumnStats only used for testing purposes.
*/
private[sql] class NoopColumnStats extends ColumnStats {
- override def gatherStats(row: Row, ordinal: Int): Unit = super.gatherStats(row, ordinal)
+ override def gatherStats(row: InternalRow, ordinal: Int): Unit = super.gatherStats(row, ordinal)
- override def collectedStatistics: Row = Row(null, null, nullCount, count, 0L)
+ override def collectedStatistics: InternalRow = InternalRow(null, null, nullCount, count, 0L)
}
private[sql] class BooleanColumnStats extends ColumnStats {
protected var upper = false
protected var lower = true
- override def gatherStats(row: Row, ordinal: Int): Unit = {
+ override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getBoolean(ordinal)
@@ -92,14 +92,15 @@ private[sql] class BooleanColumnStats extends ColumnStats {
}
}
- override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+ override def collectedStatistics: InternalRow =
+ InternalRow(lower, upper, nullCount, count, sizeInBytes)
}
private[sql] class ByteColumnStats extends ColumnStats {
protected var upper = Byte.MinValue
protected var lower = Byte.MaxValue
- override def gatherStats(row: Row, ordinal: Int): Unit = {
+ override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getByte(ordinal)
@@ -109,14 +110,15 @@ private[sql] class ByteColumnStats extends ColumnStats {
}
}
- override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+ override def collectedStatistics: InternalRow =
+ InternalRow(lower, upper, nullCount, count, sizeInBytes)
}
private[sql] class ShortColumnStats extends ColumnStats {
protected var upper = Short.MinValue
protected var lower = Short.MaxValue
- override def gatherStats(row: Row, ordinal: Int): Unit = {
+ override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getShort(ordinal)
@@ -126,14 +128,15 @@ private[sql] class ShortColumnStats extends ColumnStats {
}
}
- override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+ override def collectedStatistics: InternalRow =
+ InternalRow(lower, upper, nullCount, count, sizeInBytes)
}
private[sql] class LongColumnStats extends ColumnStats {
protected var upper = Long.MinValue
protected var lower = Long.MaxValue
- override def gatherStats(row: Row, ordinal: Int): Unit = {
+ override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getLong(ordinal)
@@ -143,14 +146,15 @@ private[sql] class LongColumnStats extends ColumnStats {
}
}
- override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+ override def collectedStatistics: InternalRow =
+ InternalRow(lower, upper, nullCount, count, sizeInBytes)
}
private[sql] class DoubleColumnStats extends ColumnStats {
protected var upper = Double.MinValue
protected var lower = Double.MaxValue
- override def gatherStats(row: Row, ordinal: Int): Unit = {
+ override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getDouble(ordinal)
@@ -160,14 +164,15 @@ private[sql] class DoubleColumnStats extends ColumnStats {
}
}
- override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+ override def collectedStatistics: InternalRow =
+ InternalRow(lower, upper, nullCount, count, sizeInBytes)
}
private[sql] class FloatColumnStats extends ColumnStats {
protected var upper = Float.MinValue
protected var lower = Float.MaxValue
- override def gatherStats(row: Row, ordinal: Int): Unit = {
+ override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getFloat(ordinal)
@@ -177,14 +182,15 @@ private[sql] class FloatColumnStats extends ColumnStats {
}
}
- override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+ override def collectedStatistics: InternalRow =
+ InternalRow(lower, upper, nullCount, count, sizeInBytes)
}
private[sql] class FixedDecimalColumnStats extends ColumnStats {
protected var upper: Decimal = null
protected var lower: Decimal = null
- override def gatherStats(row: Row, ordinal: Int): Unit = {
+ override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row(ordinal).asInstanceOf[Decimal]
@@ -194,14 +200,15 @@ private[sql] class FixedDecimalColumnStats extends ColumnStats {
}
}
- override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+ override def collectedStatistics: InternalRow =
+ InternalRow(lower, upper, nullCount, count, sizeInBytes)
}
private[sql] class IntColumnStats extends ColumnStats {
protected var upper = Int.MinValue
protected var lower = Int.MaxValue
- override def gatherStats(row: Row, ordinal: Int): Unit = {
+ override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getInt(ordinal)
@@ -211,14 +218,15 @@ private[sql] class IntColumnStats extends ColumnStats {
}
}
- override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+ override def collectedStatistics: InternalRow =
+ InternalRow(lower, upper, nullCount, count, sizeInBytes)
}
private[sql] class StringColumnStats extends ColumnStats {
protected var upper: UTF8String = null
protected var lower: UTF8String = null
- override def gatherStats(row: Row, ordinal: Int): Unit = {
+ override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row(ordinal).asInstanceOf[UTF8String]
@@ -228,7 +236,8 @@ private[sql] class StringColumnStats extends ColumnStats {
}
}
- override def collectedStatistics: Row = Row(lower, upper, nullCount, count, sizeInBytes)
+ override def collectedStatistics: InternalRow =
+ InternalRow(lower, upper, nullCount, count, sizeInBytes)
}
private[sql] class DateColumnStats extends IntColumnStats
@@ -236,23 +245,25 @@ private[sql] class DateColumnStats extends IntColumnStats
private[sql] class TimestampColumnStats extends LongColumnStats
private[sql] class BinaryColumnStats extends ColumnStats {
- override def gatherStats(row: Row, ordinal: Int): Unit = {
+ override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
sizeInBytes += BINARY.actualSize(row, ordinal)
}
}
- override def collectedStatistics: Row = Row(null, null, nullCount, count, sizeInBytes)
+ override def collectedStatistics: InternalRow =
+ InternalRow(null, null, nullCount, count, sizeInBytes)
}
private[sql] class GenericColumnStats extends ColumnStats {
- override def gatherStats(row: Row, ordinal: Int): Unit = {
+ override def gatherStats(row: InternalRow, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
sizeInBytes += GENERIC.actualSize(row, ordinal)
}
}
- override def collectedStatistics: Row = Row(null, null, nullCount, count, sizeInBytes)
+ override def collectedStatistics: InternalRow =
+ InternalRow(null, null, nullCount, count, sizeInBytes)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 3db26fa..761f427 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -19,21 +19,16 @@ package org.apache.spark.sql.columnar
import java.nio.ByteBuffer
-import org.apache.spark.{Accumulable, Accumulator, Accumulators}
-import org.apache.spark.sql.catalyst.expressions
-
import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
-import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan}
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.{Accumulable, Accumulator, Accumulators}
private[sql] object InMemoryRelation {
def apply(
@@ -45,7 +40,7 @@ private[sql] object InMemoryRelation {
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)()
}
-private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: Row)
+private[sql] case class CachedBatch(buffers: Array[Array[Byte]], stats: InternalRow)
private[sql] case class InMemoryRelation(
output: Seq[Attribute],
@@ -56,12 +51,12 @@ private[sql] case class InMemoryRelation(
tableName: Option[String])(
private var _cachedColumnBuffers: RDD[CachedBatch] = null,
private var _statistics: Statistics = null,
- private var _batchStats: Accumulable[ArrayBuffer[Row], Row] = null)
+ private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null)
extends LogicalPlan with MultiInstanceRelation {
- private val batchStats: Accumulable[ArrayBuffer[Row], Row] =
+ private val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] =
if (_batchStats == null) {
- child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[Row])
+ child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow])
} else {
_batchStats
}
@@ -151,7 +146,7 @@ private[sql] case class InMemoryRelation(
rowCount += 1
}
- val stats = Row.merge(columnBuilders.map(_.columnStats.collectedStatistics) : _*)
+ val stats = InternalRow.merge(columnBuilders.map(_.columnStats.collectedStatistics) : _*)
batchStats += stats
CachedBatch(columnBuilders.map(_.build().array()), stats)
@@ -267,7 +262,7 @@ private[sql] case class InMemoryColumnarTableScan(
private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
if (enableAccumulators) {
readPartitions.setValue(0)
readBatches.setValue(0)
@@ -296,7 +291,7 @@ private[sql] case class InMemoryColumnarTableScan(
val nextRow = new SpecificMutableRow(requestedColumnDataTypes)
- def cachedBatchesToRows(cacheBatches: Iterator[CachedBatch]): Iterator[Row] = {
+ def cachedBatchesToRows(cacheBatches: Iterator[CachedBatch]): Iterator[InternalRow] = {
val rows = cacheBatches.flatMap { cachedBatch =>
// Build column accessors
val columnAccessors = requestedColumnIndices.map { batchColumnIndex =>
@@ -306,15 +301,15 @@ private[sql] case class InMemoryColumnarTableScan(
}
// Extract rows via column accessors
- new Iterator[Row] {
+ new Iterator[InternalRow] {
private[this] val rowLen = nextRow.length
- override def next(): Row = {
+ override def next(): InternalRow = {
var i = 0
while (i < rowLen) {
columnAccessors(i).extractTo(nextRow, i)
i += 1
}
- if (attributes.isEmpty) Row.empty else nextRow
+ if (attributes.isEmpty) InternalRow.empty else nextRow
}
override def hasNext: Boolean = columnAccessors(0).hasNext
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
index f1f494a..ba47bc7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.columnar
import java.nio.{ByteBuffer, ByteOrder}
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
/**
* A stackable trait used for building byte buffer for a column containing null values. Memory
@@ -52,7 +52,7 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder {
super.initialize(initialSize, columnName, useCompression)
}
- abstract override def appendFrom(row: Row, ordinal: Int): Unit = {
+ abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
columnStats.gatherStats(row, ordinal)
if (row.isNullAt(ordinal)) {
nulls = ColumnBuilder.ensureFreeSpace(nulls, 4)
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
index 8e2a1af..39b21dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.columnar.compression
import java.nio.{ByteBuffer, ByteOrder}
import org.apache.spark.Logging
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.columnar.{ColumnBuilder, NativeColumnBuilder}
import org.apache.spark.sql.types.AtomicType
@@ -66,7 +66,7 @@ private[sql] trait CompressibleColumnBuilder[T <: AtomicType]
encoder.compressionRatio < 0.8
}
- private def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {
+ private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
var i = 0
while (i < compressionEncoders.length) {
compressionEncoders(i).gatherCompressibilityStats(row, ordinal)
@@ -74,7 +74,7 @@ private[sql] trait CompressibleColumnBuilder[T <: AtomicType]
}
}
- abstract override def appendFrom(row: Row, ordinal: Int): Unit = {
+ abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
super.appendFrom(row, ordinal)
if (!row.isNullAt(ordinal)) {
gatherCompressibilityStats(row, ordinal)
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
index 17c2d9b..4eaec6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
@@ -18,14 +18,13 @@
package org.apache.spark.sql.columnar.compression
import java.nio.{ByteBuffer, ByteOrder}
-
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}
import org.apache.spark.sql.types.AtomicType
private[sql] trait Encoder[T <: AtomicType] {
- def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {}
+ def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {}
def compressedSize: Int
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
index 534ae90..5abc125 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
@@ -22,8 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.runtimeMirror
-
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow}
import org.apache.spark.sql.columnar._
import org.apache.spark.sql.types._
@@ -96,7 +95,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
override def compressedSize: Int = _compressedSize
- override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {
+ override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
val value = columnType.getField(row, ordinal)
val actualSize = columnType.actualSize(row, ordinal)
_uncompressedSize += actualSize
@@ -217,7 +216,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
// to store dictionary element count.
private var dictionarySize = 4
- override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {
+ override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
val value = columnType.getField(row, ordinal)
if (!overflow) {
@@ -310,7 +309,7 @@ private[sql] case object BooleanBitSet extends CompressionScheme {
class Encoder extends compression.Encoder[BooleanType.type] {
private var _uncompressedSize = 0
- override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {
+ override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
_uncompressedSize += BOOLEAN.defaultSize
}
@@ -404,7 +403,7 @@ private[sql] case object IntDelta extends CompressionScheme {
private var prevValue: Int = _
- override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {
+ override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
val value = row.getInt(ordinal)
val delta = value - prevValue
@@ -484,7 +483,7 @@ private[sql] case object LongDelta extends CompressionScheme {
private var prevValue: Long = _
- override def gatherCompressibilityStats(row: Row, ordinal: Int): Unit = {
+ override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {
val value = row.getLong(ordinal)
val delta = value - prevValue
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
index 8d16749..6e8a5ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
@@ -20,12 +20,10 @@ package org.apache.spark.sql.execution
import java.util.HashMap
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.SQLContext
/**
* :: DeveloperApi ::
@@ -121,11 +119,11 @@ case class Aggregate(
}
}
- protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
+ protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
if (groupingExpressions.isEmpty) {
child.execute().mapPartitions { iter =>
val buffer = newAggregateBuffer()
- var currentRow: Row = null
+ var currentRow: InternalRow = null
while (iter.hasNext) {
currentRow = iter.next()
var i = 0
@@ -147,10 +145,10 @@ case class Aggregate(
}
} else {
child.execute().mapPartitions { iter =>
- val hashTable = new HashMap[Row, Array[AggregateFunction]]
+ val hashTable = new HashMap[InternalRow, Array[AggregateFunction]]
val groupingProjection = new InterpretedMutableProjection(groupingExpressions, child.output)
- var currentRow: Row = null
+ var currentRow: InternalRow = null
while (iter.hasNext) {
currentRow = iter.next()
val currentGroup = groupingProjection(currentRow)
@@ -167,7 +165,7 @@ case class Aggregate(
}
}
- new Iterator[Row] {
+ new Iterator[InternalRow] {
private[this] val hashTableIter = hashTable.entrySet().iterator()
private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length)
private[this] val resultProjection =
@@ -177,7 +175,7 @@ case class Aggregate(
override final def hasNext: Boolean = hashTableIter.hasNext
- override final def next(): Row = {
+ override final def next(): InternalRow = {
val currentEntry = hashTableIter.next()
val currentGroup = currentEntry.getKey
val currentBuffer = currentEntry.getValue
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 6fa7ccc..c9a1883 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -17,19 +17,19 @@
package org.apache.spark.sql.execution
-import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
+import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.util.MutablePair
+import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv}
/**
* :: DeveloperApi ::
@@ -157,7 +157,7 @@ case class Exchange(
serializer
}
- protected override def doExecute(): RDD[Row] = attachTree(this , "execute") {
+ protected override def doExecute(): RDD[InternalRow] = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
val keySchema = expressions.map(_.dataType).toArray
@@ -173,11 +173,11 @@ case class Exchange(
} else {
child.execute().mapPartitions { iter =>
val hashExpressions = newMutableProjection(expressions, child.output)()
- val mutablePair = new MutablePair[Row, Row]()
+ val mutablePair = new MutablePair[InternalRow, InternalRow]()
iter.map(r => mutablePair.update(hashExpressions(r), r))
}
}
- val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part)
+ val shuffled = new ShuffledRDD[InternalRow, InternalRow, InternalRow](rdd, part)
shuffled.setSerializer(serializer)
shuffled.map(_._2)
@@ -190,7 +190,7 @@ case class Exchange(
// Internally, RangePartitioner runs a job on the RDD that samples keys to compute
// partition bounds. To get accurate samples, we need to copy the mutable keys.
val rddForSampling = childRdd.mapPartitions { iter =>
- val mutablePair = new MutablePair[Row, Null]()
+ val mutablePair = new MutablePair[InternalRow, Null]()
iter.map(row => mutablePair.update(row.copy(), null))
}
// TODO: RangePartitioner should take an Ordering.
@@ -202,12 +202,12 @@ case class Exchange(
childRdd.mapPartitions { iter => iter.map(row => (row.copy(), null))}
} else {
childRdd.mapPartitions { iter =>
- val mutablePair = new MutablePair[Row, Null]()
+ val mutablePair = new MutablePair[InternalRow, Null]()
iter.map(row => mutablePair.update(row, null))
}
}
- val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
+ val shuffled = new ShuffledRDD[InternalRow, Null, Null](rdd, part)
shuffled.setSerializer(serializer)
shuffled.map(_._1)
@@ -217,14 +217,16 @@ case class Exchange(
val partitioner = new HashPartitioner(1)
val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer)) {
- child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
+ child.execute().mapPartitions {
+ iter => iter.map(r => (null, r.copy()))
+ }
} else {
child.execute().mapPartitions { iter =>
- val mutablePair = new MutablePair[Null, Row]()
+ val mutablePair = new MutablePair[Null, InternalRow]()
iter.map(r => mutablePair.update(null, r))
}
}
- val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner)
+ val shuffled = new ShuffledRDD[Null, InternalRow, InternalRow](rdd, partitioner)
shuffled.setSerializer(serializer)
shuffled.map(_._2)
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index f931dc9..da27a75 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
@@ -31,7 +31,7 @@ import org.apache.spark.sql.{Row, SQLContext}
*/
@DeveloperApi
object RDDConversions {
- def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[Row] = {
+ def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
data.mapPartitions { iterator =>
val numColumns = outputTypes.length
val mutableRow = new GenericMutableRow(numColumns)
@@ -51,7 +51,7 @@ object RDDConversions {
/**
* Convert the objects inside Row into the types Catalyst expected.
*/
- def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[Row] = {
+ def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = {
data.mapPartitions { iterator =>
val numColumns = outputTypes.length
val mutableRow = new GenericMutableRow(numColumns)
@@ -70,7 +70,9 @@ object RDDConversions {
}
/** Logical plan node for scanning data from an RDD. */
-private[sql] case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLContext)
+private[sql] case class LogicalRDD(
+ output: Seq[Attribute],
+ rdd: RDD[InternalRow])(sqlContext: SQLContext)
extends LogicalPlan with MultiInstanceRelation {
override def children: Seq[LogicalPlan] = Nil
@@ -91,13 +93,15 @@ private[sql] case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlCon
}
/** Physical plan node for scanning data from an RDD. */
-private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
- protected override def doExecute(): RDD[Row] = rdd
+private[sql] case class PhysicalRDD(
+ output: Seq[Attribute],
+ rdd: RDD[InternalRow]) extends LeafNode {
+ protected override def doExecute(): RDD[InternalRow] = rdd
}
/** Logical plan node for scanning data from a local collection. */
private[sql]
-case class LogicalLocalTable(output: Seq[Attribute], rows: Seq[Row])(sqlContext: SQLContext)
+case class LogicalLocalTable(output: Seq[Attribute], rows: Seq[InternalRow])(sqlContext: SQLContext)
extends LogicalPlan with MultiInstanceRelation {
override def children: Seq[LogicalPlan] = Nil
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index 4b601c1..42a0c1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -19,10 +19,9 @@ package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{UnknownPartitioning, Partitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
/**
* Apply the all of the GroupExpressions to every input row, hence we will get
@@ -43,7 +42,7 @@ case class Expand(
// as UNKNOWN partitioning
override def outputPartitioning: Partitioning = UnknownPartitioning(0)
- protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
+ protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
child.execute().mapPartitions { iter =>
// TODO Move out projection objects creation and transfer to
// workers via closure. However we can't assume the Projection
@@ -51,14 +50,14 @@ case class Expand(
// create the projections within each of the partition processing.
val groups = projections.map(ee => newProjection(ee, child.output)).toArray
- new Iterator[Row] {
- private[this] var result: Row = _
+ new Iterator[InternalRow] {
+ private[this] var result: InternalRow = _
private[this] var idx = -1 // -1 means the initial state
- private[this] var input: Row = _
+ private[this] var input: InternalRow = _
override final def hasNext: Boolean = (-1 < idx && idx < groups.length) || iter.hasNext
- override final def next(): Row = {
+ override final def next(): InternalRow = {
if (idx <= 0) {
// in the initial (-1) or beginning(0) of a new input row, fetch the next input tuple
input = iter.next()
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
index dd02c1f..c1665f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
@@ -25,12 +25,12 @@ import org.apache.spark.sql.catalyst.expressions._
* For lazy computing, be sure the generator.terminate() called in the very last
* TODO reusing the CompletionIterator?
*/
-private[execution] sealed case class LazyIterator(func: () => TraversableOnce[Row])
- extends Iterator[Row] {
+private[execution] sealed case class LazyIterator(func: () => TraversableOnce[InternalRow])
+ extends Iterator[InternalRow] {
lazy val results = func().toIterator
override def hasNext: Boolean = results.hasNext
- override def next(): Row = results.next()
+ override def next(): InternalRow = results.next()
}
/**
@@ -58,11 +58,11 @@ case class Generate(
val boundGenerator = BindReferences.bindReference(generator, child.output)
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
// boundGenerator.terminate() should be triggered after all of the rows in the partition
if (join) {
child.execute().mapPartitions { iter =>
- val generatorNullRow = Row.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null))
+ val generatorNullRow = InternalRow.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null))
val joinedRow = new JoinedRow
iter.flatMap { row =>
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index 1c40a92..ba2c8f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -66,7 +66,7 @@ case class GeneratedAggregate(
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
val aggregatesToCompute = aggregateExpressions.flatMap { a =>
a.collect { case agg: AggregateExpression => agg}
}
@@ -273,7 +273,7 @@ case class GeneratedAggregate(
if (groupingExpressions.isEmpty) {
// TODO: Codegening anything other than the updateProjection is probably over kill.
val buffer = newAggregationBuffer(EmptyRow).asInstanceOf[MutableRow]
- var currentRow: Row = null
+ var currentRow: InternalRow = null
updateProjection.target(buffer)
while (iter.hasNext) {
@@ -295,19 +295,19 @@ case class GeneratedAggregate(
)
while (iter.hasNext) {
- val currentRow: Row = iter.next()
- val groupKey: Row = groupProjection(currentRow)
+ val currentRow: InternalRow = iter.next()
+ val groupKey: InternalRow = groupProjection(currentRow)
val aggregationBuffer = aggregationMap.getAggregationBuffer(groupKey)
updateProjection.target(aggregationBuffer)(joinedRow(aggregationBuffer, currentRow))
}
- new Iterator[Row] {
+ new Iterator[InternalRow] {
private[this] val mapIterator = aggregationMap.iterator()
private[this] val resultProjection = resultProjectionBuilder()
def hasNext: Boolean = mapIterator.hasNext
- def next(): Row = {
+ def next(): InternalRow = {
val entry = mapIterator.next()
val result = resultProjection(joinedRow(entry.key, entry.value))
if (hasNext) {
@@ -326,9 +326,9 @@ case class GeneratedAggregate(
if (unsafeEnabled) {
log.info("Not using Unsafe-based aggregator because it is not supported for this schema")
}
- val buffers = new java.util.HashMap[Row, MutableRow]()
+ val buffers = new java.util.HashMap[InternalRow, MutableRow]()
- var currentRow: Row = null
+ var currentRow: InternalRow = null
while (iter.hasNext) {
currentRow = iter.next()
val currentGroup = groupProjection(currentRow)
@@ -342,13 +342,13 @@ case class GeneratedAggregate(
updateProjection.target(currentBuffer)(joinedRow(currentBuffer, currentRow))
}
- new Iterator[Row] {
+ new Iterator[InternalRow] {
private[this] val resultIterator = buffers.entrySet.iterator()
private[this] val resultProjection = resultProjectionBuilder()
def hasNext: Boolean = resultIterator.hasNext
- def next(): Row = {
+ def next(): InternalRow = {
val currentGroup = resultIterator.next()
resultProjection(joinedRow(currentGroup.getKey, currentGroup.getValue))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index 03bee80..cd34118 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -19,18 +19,20 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
import org.apache.spark.sql.catalyst.expressions.Attribute
/**
* Physical plan node for scanning data from a local collection.
*/
-private[sql] case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) extends LeafNode {
+private[sql] case class LocalTableScan(
+ output: Seq[Attribute],
+ rows: Seq[InternalRow]) extends LeafNode {
private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
- protected override def doExecute(): RDD[Row] = rdd
+ protected override def doExecute(): RDD[InternalRow] = rdd
override def executeCollect(): Array[Row] = {
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 435ac01..7739a9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -23,6 +23,7 @@ import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical._
@@ -79,11 +80,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
/**
- * Returns the result of this query as an RDD[Row] by delegating to doExecute
+ * Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
* after adding query plan information to created RDDs for visualization.
* Concrete implementations of SparkPlan should override doExecute instead.
*/
- final def execute(): RDD[Row] = {
+ final def execute(): RDD[InternalRow] = {
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
doExecute()
}
@@ -91,9 +92,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/**
* Overridden by concrete implementations of SparkPlan.
- * Produces the result of the query as an RDD[Row]
+ * Produces the result of the query as an RDD[InternalRow]
*/
- protected def doExecute(): RDD[Row]
+ protected def doExecute(): RDD[InternalRow]
/**
* Runs this query returning the result as an array.
@@ -117,7 +118,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
val childRDD = execute().map(_.copy())
- val buf = new ArrayBuffer[Row]
+ val buf = new ArrayBuffer[InternalRow]
val totalParts = childRDD.partitions.length
var partsScanned = 0
while (buf.size < n && partsScanned < totalParts) {
@@ -140,7 +141,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val sc = sqlContext.sparkContext
val res =
- sc.runJob(childRDD, (it: Iterator[Row]) => it.take(left).toArray, p, allowLocal = false)
+ sc.runJob(childRDD, (it: Iterator[InternalRow]) => it.take(left).toArray, p,
+ allowLocal = false)
res.foreach(buf ++= _.take(n - buf.size))
partsScanned += numPartsToTry
@@ -175,7 +177,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
protected def newPredicate(
- expression: Expression, inputSchema: Seq[Attribute]): (Row) => Boolean = {
+ expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
if (codegenEnabled) {
GeneratePredicate.generate(expression, inputSchema)
} else {
@@ -183,7 +185,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
}
- protected def newOrdering(order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[Row] = {
+ protected def newOrdering(
+ order: Seq[SortOrder],
+ inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
if (codegenEnabled) {
GenerateOrdering.generate(order, inputSchema)
} else {
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 7a1331a..422992d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -203,7 +203,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
protected lazy val singleRowRdd =
- sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
+ sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): InternalRow), 1)
object TakeOrdered extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index c4327ce..fd6f1d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -20,9 +20,8 @@ package org.apache.spark.sql.execution
import java.util
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, ClusteredDistribution, Partitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.util.collection.CompactBuffer
/**
@@ -112,16 +111,16 @@ case class Window(
}
}
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { iter =>
- new Iterator[Row] {
+ new Iterator[InternalRow] {
// Although input rows are grouped based on windowSpec.partitionSpec, we need to
// know when we have a new partition.
// This is to manually construct an ordering that can be used to compare rows.
// TODO: We may want to have a newOrdering that takes BoundReferences.
// So, we can take advantave of code gen.
- private val partitionOrdering: Ordering[Row] =
+ private val partitionOrdering: Ordering[InternalRow] =
RowOrdering.forSchema(windowSpec.partitionSpec.map(_.dataType))
// This is used to project expressions for the partition specification.
@@ -137,13 +136,13 @@ case class Window(
// The number of buffered rows in the inputRowBuffer (the size of the current partition).
var partitionSize: Int = 0
// The buffer used to buffer rows in a partition.
- var inputRowBuffer: CompactBuffer[Row] = _
+ var inputRowBuffer: CompactBuffer[InternalRow] = _
// The partition key of the current partition.
- var currentPartitionKey: Row = _
+ var currentPartitionKey: InternalRow = _
// The partition key of next partition.
- var nextPartitionKey: Row = _
+ var nextPartitionKey: InternalRow = _
// The first row of next partition.
- var firstRowInNextPartition: Row = _
+ var firstRowInNextPartition: InternalRow = _
// Indicates if this partition is the last one in the iter.
var lastPartition: Boolean = false
@@ -316,7 +315,7 @@ case class Window(
!lastPartition || (rowPosition < partitionSize)
}
- override final def next(): Row = {
+ override final def next(): InternalRow = {
if (hasNext) {
if (rowPosition == partitionSize) {
// All rows of this buffer have been consumed.
@@ -353,7 +352,7 @@ case class Window(
// Fetch the next partition.
private def fetchNextPartition(): Unit = {
// Create a new buffer for input rows.
- inputRowBuffer = new CompactBuffer[Row]()
+ inputRowBuffer = new CompactBuffer[InternalRow]()
// We already have the first row for this partition
// (recorded in firstRowInNextPartition). Add it back.
inputRowBuffer += firstRowInNextPartition
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index fb42072..7aedd63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -17,16 +17,17 @@
package org.apache.spark.sql.execution
-import org.apache.spark.{SparkEnv, HashPartitioner, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.util.{CompletionIterator, MutablePair}
import org.apache.spark.util.collection.ExternalSorter
+import org.apache.spark.util.{CompletionIterator, MutablePair}
+import org.apache.spark.{HashPartitioner, SparkEnv}
/**
* :: DeveloperApi ::
@@ -37,7 +38,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
@transient lazy val buildProjection = newMutableProjection(projectList, child.output)
- protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter =>
+ protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
val resuableProjection = buildProjection()
iter.map(resuableProjection)
}
@@ -52,9 +53,10 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends
case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
- @transient lazy val conditionEvaluator: (Row) => Boolean = newPredicate(condition, child.output)
+ @transient lazy val conditionEvaluator: (InternalRow) => Boolean =
+ newPredicate(condition, child.output)
- protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter =>
+ protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
iter.filter(conditionEvaluator)
}
@@ -83,7 +85,7 @@ case class Sample(
override def output: Seq[Attribute] = child.output
// TODO: How to pick seed?
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
if (withReplacement) {
child.execute().map(_.copy()).sample(withReplacement, upperBound - lowerBound, seed)
} else {
@@ -99,7 +101,8 @@ case class Sample(
case class Union(children: Seq[SparkPlan]) extends SparkPlan {
// TODO: attributes output by union should be distinct for nullability purposes
override def output: Seq[Attribute] = children.head.output
- protected override def doExecute(): RDD[Row] = sparkContext.union(children.map(_.execute()))
+ protected override def doExecute(): RDD[InternalRow] =
+ sparkContext.union(children.map(_.execute()))
}
/**
@@ -124,19 +127,19 @@ case class Limit(limit: Int, child: SparkPlan)
override def executeCollect(): Array[Row] = child.executeTake(limit)
- protected override def doExecute(): RDD[Row] = {
- val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) {
+ protected override def doExecute(): RDD[InternalRow] = {
+ val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if (sortBasedShuffleOn) {
child.execute().mapPartitions { iter =>
iter.take(limit).map(row => (false, row.copy()))
}
} else {
child.execute().mapPartitions { iter =>
- val mutablePair = new MutablePair[Boolean, Row]()
+ val mutablePair = new MutablePair[Boolean, InternalRow]()
iter.take(limit).map(row => mutablePair.update(false, row))
}
}
val part = new HashPartitioner(1)
- val shuffled = new ShuffledRDD[Boolean, Row, Row](rdd, part)
+ val shuffled = new ShuffledRDD[Boolean, InternalRow, InternalRow](rdd, part)
shuffled.setSerializer(new SparkSqlSerializer(child.sqlContext.sparkContext.getConf))
shuffled.mapPartitions(_.take(limit).map(_._2))
}
@@ -157,7 +160,8 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
private val ord: RowOrdering = new RowOrdering(sortOrder, child.output)
- private def collectData(): Array[Row] = child.execute().map(_.copy()).takeOrdered(limit)(ord)
+ private def collectData(): Array[InternalRow] =
+ child.execute().map(_.copy()).takeOrdered(limit)(ord)
override def executeCollect(): Array[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(schema)
@@ -166,7 +170,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
- protected override def doExecute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)
+ protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(collectData(), 1)
override def outputOrdering: Seq[SortOrder] = sortOrder
}
@@ -186,7 +190,7 @@ case class Sort(
override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
- protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
+ protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
iterator.map(_.copy()).toArray.sorted(ordering).iterator
@@ -214,14 +218,14 @@ case class ExternalSort(
override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
- protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
+ protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
- val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
+ val sorter = new ExternalSorter[InternalRow, Null, InternalRow](ordering = Some(ordering))
sorter.insertAll(iterator.map(r => (r.copy, null)))
val baseIterator = sorter.iterator.map(_._1)
// TODO(marmbrus): The complex type signature below thwarts inference for no reason.
- CompletionIterator[Row, Iterator[Row]](baseIterator, sorter.stop())
+ CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, sorter.stop())
}, preservesPartitioning = true)
}
@@ -239,7 +243,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
extends UnaryNode {
override def output: Seq[Attribute] = child.output
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
}
}
@@ -254,7 +258,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
}
}
@@ -268,7 +272,7 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = children.head.output
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
}
}
@@ -283,5 +287,5 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
def children: Seq[SparkPlan] = child :: Nil
- protected override def doExecute(): RDD[Row] = child.execute()
+ protected override def doExecute(): RDD[InternalRow] = child.execute()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 49b361e..653792e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -20,13 +20,13 @@ package org.apache.spark.sql.execution
import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext}
/**
* A logical command that is executed for its side-effects. `RunnableCommand`s are
@@ -64,9 +64,9 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
val converted = sideEffectResult.map(r =>
- CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[Row])
+ CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[InternalRow])
sqlContext.sparkContext.parallelize(converted, 1)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 83c1f65..3ee4033 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.unsafe.types.UTF8String
@@ -25,7 +26,7 @@ import scala.collection.mutable.HashSet
import org.apache.spark.{AccumulatorParam, Accumulator}
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.{SQLConf, SQLContext, DataFrame, Row}
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.types._
@@ -126,11 +127,11 @@ package object debug {
}
}
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitions { iter =>
- new Iterator[Row] {
+ new Iterator[InternalRow] {
def hasNext: Boolean = iter.hasNext
- def next(): Row = {
+ def next(): InternalRow = {
val currentRow = iter.next()
tupleCount += 1
var i = 0
@@ -155,7 +156,7 @@ package object debug {
def typeCheck(data: Any, schema: DataType): Unit = (data, schema) match {
case (null, _) =>
- case (row: Row, StructType(fields)) =>
+ case (row: InternalRow, StructType(fields)) =>
row.toSeq.zip(fields.map(_.dataType)).foreach { case(d, t) => typeCheck(d, t) }
case (s: Seq[_], ArrayType(elemType, _)) =>
s.foreach(typeCheck(_, elemType))
@@ -196,7 +197,7 @@ package object debug {
def children: List[SparkPlan] = child :: Nil
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
child.execute().map { row =>
try typeCheck(row, child.schema) catch {
case e: Exception =>
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
index e228a60..68914cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.expressions
import org.apache.spark.TaskContext
-import org.apache.spark.sql.catalyst.expressions.{Row, LeafExpression}
+import org.apache.spark.sql.catalyst.expressions.{InternalRow, LeafExpression}
import org.apache.spark.sql.types.{LongType, DataType}
/**
@@ -43,7 +43,7 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression {
override def dataType: DataType = LongType
- override def eval(input: Row): Long = {
+ override def eval(input: InternalRow): Long = {
val currentCount = count
count += 1
(TaskContext.get().partitionId().toLong << 33) + currentCount
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
index 1272793..12c2eed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.expressions
import org.apache.spark.TaskContext
-import org.apache.spark.sql.catalyst.expressions.{LeafExpression, Row}
+import org.apache.spark.sql.catalyst.expressions.{LeafExpression, InternalRow}
import org.apache.spark.sql.types.{IntegerType, DataType}
@@ -31,5 +31,5 @@ private[sql] case object SparkPartitionID extends LeafExpression {
override def dataType: DataType = IntegerType
- override def eval(input: Row): Int = TaskContext.get().partitionId()
+ override def eval(input: InternalRow): Int = TaskContext.get().partitionId()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index b8b12be..2d2e1b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -17,16 +17,15 @@
package org.apache.spark.sql.execution.joins
-import org.apache.spark.rdd.RDD
-import org.apache.spark.util.ThreadUtils
-
import scala.concurrent._
import scala.concurrent.duration._
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.expressions.{Row, Expression}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.{Expression, InternalRow}
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.util.ThreadUtils
/**
* :: DeveloperApi ::
@@ -61,12 +60,12 @@ case class BroadcastHashJoin(
@transient
private val broadcastFuture = future {
// Note that we use .execute().collect() because we don't want to convert data to Scala types
- val input: Array[Row] = buildPlan.execute().map(_.copy()).collect()
+ val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect()
val hashed = HashedRelation(input.iterator, buildSideKeyGenerator, input.length)
sparkContext.broadcast(hashed)
}(BroadcastHashJoin.broadcastHashJoinExecutionContext)
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index a32e5fc..044964f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.joins
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
/**
@@ -38,10 +38,10 @@ case class BroadcastLeftSemiJoinHash(
override def output: Seq[Attribute] = left.output
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
val buildIter = buildPlan.execute().map(_.copy()).collect().toIterator
- val hashSet = new java.util.HashSet[Row]()
- var currentRow: Row = null
+ val hashSet = new java.util.HashSet[InternalRow]()
+ var currentRow: InternalRow = null
// Create a Hash set of buildKeys
while (buildIter.hasNext) {
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index caad3df..0b2cf8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -61,13 +61,14 @@ case class BroadcastNestedLoopJoin(
@transient private lazy val boundCondition =
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
val broadcastedRelation =
- sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
+ sparkContext.broadcast(broadcast.execute().map(_.copy())
+ .collect().toIndexedSeq)
/** All rows that either match both-way, or rows from streamed joined with nulls. */
val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter =>
- val matchedRows = new CompactBuffer[Row]
+ val matchedRows = new CompactBuffer[InternalRow]
// TODO: Use Spark's BitSet.
val includedBroadcastTuples =
new scala.collection.mutable.BitSet(broadcastedRelation.value.size)
@@ -118,8 +119,8 @@ case class BroadcastNestedLoopJoin(
val leftNulls = new GenericMutableRow(left.output.size)
val rightNulls = new GenericMutableRow(right.output.size)
/** Rows from broadcasted joined with nulls. */
- val broadcastRowsWithNulls: Seq[Row] = {
- val buf: CompactBuffer[Row] = new CompactBuffer()
+ val broadcastRowsWithNulls: Seq[InternalRow] = {
+ val buf: CompactBuffer[InternalRow] = new CompactBuffer()
var i = 0
val rel = broadcastedRelation.value
while (i < rel.length) {
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 191c00c..261b472 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.joins
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow}
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
val leftResults = left.execute().map(_.copy())
val rightResults = right.execute().map(_.copy())
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 851de16..3a4196a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -49,11 +49,13 @@ trait HashJoin {
@transient protected lazy val streamSideKeyGenerator: () => MutableProjection =
newMutableProjection(streamedKeys, streamedPlan.output)
- protected def hashJoin(streamIter: Iterator[Row], hashedRelation: HashedRelation): Iterator[Row] =
+ protected def hashJoin(
+ streamIter: Iterator[InternalRow],
+ hashedRelation: HashedRelation): Iterator[InternalRow] =
{
- new Iterator[Row] {
- private[this] var currentStreamedRow: Row = _
- private[this] var currentHashMatches: CompactBuffer[Row] = _
+ new Iterator[InternalRow] {
+ private[this] var currentStreamedRow: InternalRow = _
+ private[this] var currentHashMatches: CompactBuffer[InternalRow] = _
private[this] var currentMatchPosition: Int = -1
// Mutable per row objects.
@@ -65,7 +67,7 @@ trait HashJoin {
(currentMatchPosition != -1 && currentMatchPosition < currentHashMatches.size) ||
(streamIter.hasNext && fetchNext())
- override final def next(): Row = {
+ override final def next(): InternalRow = {
val ret = buildSide match {
case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition))
case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow)
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index c21a453..19aef99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -68,26 +68,29 @@ case class HashOuterJoin(
}
}
- @transient private[this] lazy val DUMMY_LIST = Seq[Row](null)
- @transient private[this] lazy val EMPTY_LIST = Seq.empty[Row]
+ @transient private[this] lazy val DUMMY_LIST = Seq[InternalRow](null)
+ @transient private[this] lazy val EMPTY_LIST = Seq.empty[InternalRow]
@transient private[this] lazy val leftNullRow = new GenericRow(left.output.length)
@transient private[this] lazy val rightNullRow = new GenericRow(right.output.length)
@transient private[this] lazy val boundCondition =
- condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true)
+ condition.map(
+ newPredicate(_, left.output ++ right.output)).getOrElse((row: InternalRow) => true)
// TODO we need to rewrite all of the iterators with our own implementation instead of the Scala
// iterator for performance purpose.
private[this] def leftOuterIterator(
- key: Row, joinedRow: JoinedRow, rightIter: Iterable[Row]): Iterator[Row] = {
- val ret: Iterable[Row] = {
+ key: InternalRow,
+ joinedRow: JoinedRow,
+ rightIter: Iterable[InternalRow]): Iterator[InternalRow] = {
+ val ret: Iterable[InternalRow] = {
if (!key.anyNull) {
val temp = rightIter.collect {
case r if boundCondition(joinedRow.withRight(r)) => joinedRow.copy()
}
if (temp.size == 0) {
- joinedRow.withRight(rightNullRow).copy :: Nil
+ joinedRow.withRight(rightNullRow).copy.asInstanceOf[InternalRow] :: Nil
} else {
temp
}
@@ -99,12 +102,15 @@ case class HashOuterJoin(
}
private[this] def rightOuterIterator(
- key: Row, leftIter: Iterable[Row], joinedRow: JoinedRow): Iterator[Row] = {
+ key: InternalRow,
+ leftIter: Iterable[InternalRow],
+ joinedRow: JoinedRow): Iterator[InternalRow] = {
- val ret: Iterable[Row] = {
+ val ret: Iterable[InternalRow] = {
if (!key.anyNull) {
val temp = leftIter.collect {
- case l if boundCondition(joinedRow.withLeft(l)) => joinedRow.copy
+ case l if boundCondition(joinedRow.withLeft(l)) =>
+ joinedRow.copy
}
if (temp.size == 0) {
joinedRow.withLeft(leftNullRow).copy :: Nil
@@ -119,14 +125,14 @@ case class HashOuterJoin(
}
private[this] def fullOuterIterator(
- key: Row, leftIter: Iterable[Row], rightIter: Iterable[Row],
- joinedRow: JoinedRow): Iterator[Row] = {
+ key: InternalRow, leftIter: Iterable[InternalRow], rightIter: Iterable[InternalRow],
+ joinedRow: JoinedRow): Iterator[InternalRow] = {
if (!key.anyNull) {
// Store the positions of records in right, if one of its associated row satisfy
// the join condition.
val rightMatchedSet = scala.collection.mutable.Set[Int]()
- leftIter.iterator.flatMap[Row] { l =>
+ leftIter.iterator.flatMap[InternalRow] { l =>
joinedRow.withLeft(l)
var matched = false
rightIter.zipWithIndex.collect {
@@ -157,24 +163,25 @@ case class HashOuterJoin(
joinedRow(leftNullRow, r).copy()
}
} else {
- leftIter.iterator.map[Row] { l =>
+ leftIter.iterator.map[InternalRow] { l =>
joinedRow(l, rightNullRow).copy()
- } ++ rightIter.iterator.map[Row] { r =>
+ } ++ rightIter.iterator.map[InternalRow] { r =>
joinedRow(leftNullRow, r).copy()
}
}
}
private[this] def buildHashTable(
- iter: Iterator[Row], keyGenerator: Projection): JavaHashMap[Row, CompactBuffer[Row]] = {
- val hashTable = new JavaHashMap[Row, CompactBuffer[Row]]()
+ iter: Iterator[InternalRow],
+ keyGenerator: Projection): JavaHashMap[InternalRow, CompactBuffer[InternalRow]] = {
+ val hashTable = new JavaHashMap[InternalRow, CompactBuffer[InternalRow]]()
while (iter.hasNext) {
val currentRow = iter.next()
val rowKey = keyGenerator(currentRow)
var existingMatchList = hashTable.get(rowKey)
if (existingMatchList == null) {
- existingMatchList = new CompactBuffer[Row]()
+ existingMatchList = new CompactBuffer[InternalRow]()
hashTable.put(rowKey, existingMatchList)
}
@@ -184,7 +191,7 @@ case class HashOuterJoin(
hashTable
}
- protected override def doExecute(): RDD[Row] = {
+ protected override def doExecute(): RDD[InternalRow] = {
val joinedRow = new JoinedRow()
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
// TODO this probably can be replaced by external sort (sort merged join?)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org