You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by marmbrus <gi...@git.apache.org> on 2016/01/14 00:58:16 UTC

[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

GitHub user marmbrus opened a pull request:

    https://github.com/apache/spark/pull/10747

    [SPARK-12813][SQL] Eliminate serialization for back to back operations

    The goal of this PR is to eliminate unnecessary translations when there are back-to-back `MapPartitions` operations.  In order to achieve this I also made the following simplifications:
    
     - Operators no longer have hold encoders, instead they have only the expressions that they need.  The benefits here are twofold: the expressions are visible to transformations so go through the normal resolution/binding process.  now that they are visible we can change them on a case by case basis.
     - Operators no longer have type parameters.  Since the engine is responsible for its own type checking, having the types visible to the complier was an unnecessary complication.  We still leverage the scala compiler in the companion factory when constructing a new operator, but after this the types are discarded.
    
    Deferred to a follow up PR:
     - Remove as much of the resolution/binding from Dataset/GroupedDataset as possible. We should still eagerly check resolution and throw an error though in the case of mismatches for an `as` operation.
     - Eliminate serializations in more cases by adding more cases to `EliminateSerialization`

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/marmbrus/spark encoderExpressions

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10747.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10747
    
----
commit 4615c9614a2a63dbb716b97093ec83801ebdeefd
Author: Michael Armbrust <mi...@databricks.com>
Date:   2016-01-13T23:15:12Z

    [SPARK-12813][SQL] Eliminate serialization for back to back operations

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171806098
  
    **[Test build #49417 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49417/consoleFull)** for PR 10747 at commit [`c34aacf`](https://github.com/apache/spark/commit/c34aacfeeb44c372cdc6385277c2511a1cd69270).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49765416
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.optimizer
    +
    +import scala.reflect.runtime.universe.TypeTag
    +
    +import org.apache.spark.sql.catalyst.dsl.expressions._
    +import org.apache.spark.sql.catalyst.dsl.plans._
    +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    +import org.apache.spark.sql.catalyst.expressions.NewInstance
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, MapPartitions}
    +import org.apache.spark.sql.catalyst.plans.PlanTest
    +import org.apache.spark.sql.catalyst.rules.RuleExecutor
    +
    +case class OtherTuple(_1: Int, _2: Int)
    +
    +class EliminateSerializationSuite extends PlanTest {
    +  private object Optimize extends RuleExecutor[LogicalPlan] {
    +    val batches =
    +      Batch("Serialization", FixedPoint(100),
    +        EliminateSerialization) :: Nil
    +  }
    +
    +  implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]()
    +  private val func = identity[Iterator[(Int, Int)]] _
    +  private val func2 = identity[Iterator[OtherTuple]] _
    +
    +  def assertObjectCreations(count: Int, plan: LogicalPlan): Unit = {
    +    val newInstances = plan.flatMap(_.expressions.collect {
    +      case n: NewInstance => n
    +    })
    +
    +    if (newInstances.size != count) {
    +      fail(
    +        s"""
    +           |Wrong number of object creations in plan: ${newInstances.size} != $count
    +           |$plan
    +         """.stripMargin)
    +    }
    +  }
    +
    +  test("back to back MapPartitions") {
    +    val input = LocalRelation('_1.int, '_2.int)
    +    val plan =
    +      MapPartitions(func,
    --- End diff --
    
    There's a test here and and in end-to-end one in DatasetSuite now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/10747


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171480448
  
    **[Test build #49353 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49353/consoleFull)** for PR 10747 at commit [`4615c96`](https://github.com/apache/spark/commit/4615c9614a2a63dbb716b97093ec83801ebdeefd).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait ObjectOperator extends LogicalPlan `
      * `case class MapPartitions(`
      * `case class AppendColumns(`
      * `case class MapGroups(`
      * `case class CoGroup(`
      * `trait ObjectOperator extends SparkPlan `
      * `case class MapPartitions(`
      * `case class AppendColumns(`
      * `case class MapGroups(`
      * `case class CoGroup(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171512554
  
    **[Test build #49360 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49360/consoleFull)** for PR 10747 at commit [`ee7f3c6`](https://github.com/apache/spark/commit/ee7f3c64e3a163e142ab55c28f435db595e34746).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49790103
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.plans.logical
    +
    +import org.apache.spark.sql.Encoder
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.types.ObjectType
    +
    +/**
    + * A trait for logical operators that operate on user defined objects.
    + */
    +trait ObjectOperator extends LogicalPlan {
    +  def serializer: Seq[NamedExpression]
    +
    +  def outputObject: NamedExpression =
    +    Alias(serializer.head.collect { case b: BoundReference => b }.head, "obj")()
    +
    +  def withObjectOutput: LogicalPlan = if (output.head.dataType.isInstanceOf[ObjectType]) {
    +    this
    +  } else {
    +    withNewSerializer(outputObject)
    +  }
    +
    +  def withNewSerializer(newSerializer: NamedExpression): LogicalPlan = makeCopy {
    +    productIterator.map {
    +      case c if c == serializer => newSerializer :: Nil
    +      case other: AnyRef => other
    +    }.toArray
    +  }
    +}
    +
    +object MapPartitions {
    +  def apply[T : Encoder, U : Encoder](
    +      func: Iterator[T] => Iterator[U],
    +      child: LogicalPlan): MapPartitions = {
    +    MapPartitions(
    +      func.asInstanceOf[Iterator[Any] => Iterator[Any]],
    +      encoderFor[T].fromRowExpression,
    +      encoderFor[U].namedExpressions,
    +      child)
    +  }
    +}
    +
    +/**
    + * A relation produced by applying `func` to each partition of the `child`.
    + * @param input used to extract the input to `func` from an input row.
    + * @param serializer use to serialize the output of `func`.
    + */
    +case class MapPartitions(
    +    func: Iterator[Any] => Iterator[Any],
    +    input: Expression,
    +    serializer: Seq[NamedExpression],
    +    child: LogicalPlan) extends UnaryNode with ObjectOperator {
    +  override def output: Seq[Attribute] = serializer.map(_.toAttribute)
    --- End diff --
    
    That would return different expressionIds anytime the function was called.  Where as we want to fix the expression IDs when the NamedExpression is created.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171512747
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49792656
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.plans.logical
    +
    +import org.apache.spark.sql.Encoder
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.types.ObjectType
    +
    +/**
    + * A trait for logical operators that operate on user defined objects.
    + */
    +trait ObjectOperator extends LogicalPlan {
    +  def serializer: Seq[NamedExpression]
    +
    +  def outputObject: NamedExpression =
    +    Alias(serializer.head.collect { case b: BoundReference => b }.head, "obj")()
    +
    +  def withObjectOutput: LogicalPlan = if (output.head.dataType.isInstanceOf[ObjectType]) {
    +    this
    +  } else {
    +    withNewSerializer(outputObject)
    +  }
    +
    +  def withNewSerializer(newSerializer: NamedExpression): LogicalPlan = makeCopy {
    +    productIterator.map {
    +      case c if c == serializer => newSerializer :: Nil
    +      case other: AnyRef => other
    +    }.toArray
    +  }
    +}
    +
    +object MapPartitions {
    +  def apply[T : Encoder, U : Encoder](
    +      func: Iterator[T] => Iterator[U],
    +      child: LogicalPlan): MapPartitions = {
    +    MapPartitions(
    +      func.asInstanceOf[Iterator[Any] => Iterator[Any]],
    +      encoderFor[T].fromRowExpression,
    +      encoderFor[U].namedExpressions,
    +      child)
    +  }
    +}
    +
    +/**
    + * A relation produced by applying `func` to each partition of the `child`.
    + * @param input used to extract the input to `func` from an input row.
    + * @param serializer use to serialize the output of `func`.
    + */
    +case class MapPartitions(
    +    func: Iterator[Any] => Iterator[Any],
    +    input: Expression,
    +    serializer: Seq[NamedExpression],
    +    child: LogicalPlan) extends UnaryNode with ObjectOperator {
    +  override def output: Seq[Attribute] = serializer.map(_.toAttribute)
    --- End diff --
    
    ah yes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171825983
  
    **[Test build #49417 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49417/consoleFull)** for PR 10747 at commit [`c34aacf`](https://github.com/apache/spark/commit/c34aacfeeb44c372cdc6385277c2511a1cd69270).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49766942
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.plans.logical
    +
    +import org.apache.spark.sql.Encoder
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.types.ObjectType
    +
    +/**
    + * A trait for logical operators that operate on user defined objects.
    + */
    +trait ObjectOperator extends LogicalPlan {
    +  def serializer: Seq[NamedExpression]
    +
    +  def outputObject: NamedExpression =
    +    Alias(serializer.head.collect { case b: BoundReference => b }.head, "obj")()
    +
    +  def withObjectOutput: LogicalPlan = if (output.head.dataType.isInstanceOf[ObjectType]) {
    +    this
    --- End diff --
    
    when will we go to this branch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171830367
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49771200
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.plans.logical
    +
    +import org.apache.spark.sql.Encoder
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.types.ObjectType
    +
    +/**
    + * A trait for logical operators that operate on user defined objects.
    + */
    +trait ObjectOperator extends LogicalPlan {
    +  def serializer: Seq[NamedExpression]
    +
    +  def outputObject: NamedExpression =
    +    Alias(serializer.head.collect { case b: BoundReference => b }.head, "obj")()
    +
    +  def withObjectOutput: LogicalPlan = if (output.head.dataType.isInstanceOf[ObjectType]) {
    +    this
    +  } else {
    +    withNewSerializer(outputObject)
    +  }
    +
    +  def withNewSerializer(newSerializer: NamedExpression): LogicalPlan = makeCopy {
    +    productIterator.map {
    +      case c if c == serializer => newSerializer :: Nil
    +      case other: AnyRef => other
    +    }.toArray
    +  }
    +}
    +
    +object MapPartitions {
    +  def apply[T : Encoder, U : Encoder](
    +      func: Iterator[T] => Iterator[U],
    +      child: LogicalPlan): MapPartitions = {
    +    MapPartitions(
    +      func.asInstanceOf[Iterator[Any] => Iterator[Any]],
    +      encoderFor[T].fromRowExpression,
    +      encoderFor[U].namedExpressions,
    +      child)
    +  }
    +}
    +
    +/**
    + * A relation produced by applying `func` to each partition of the `child`.
    + * @param input used to extract the input to `func` from an input row.
    + * @param serializer use to serialize the output of `func`.
    + */
    +case class MapPartitions(
    +    func: Iterator[Any] => Iterator[Any],
    +    input: Expression,
    +    serializer: Seq[NamedExpression],
    +    child: LogicalPlan) extends UnaryNode with ObjectOperator {
    +  override def output: Seq[Attribute] = serializer.map(_.toAttribute)
    +}
    +
    +/** Factory for constructing new `AppendColumn` nodes. */
    +object AppendColumns {
    +  def apply[T : Encoder, U : Encoder](
    +      func: T => U,
    +      child: LogicalPlan): AppendColumns = {
    +    new AppendColumns(
    +      func.asInstanceOf[Any => Any],
    +      encoderFor[T].fromRowExpression,
    +      encoderFor[U].namedExpressions,
    +      child)
    +  }
    +}
    +
    +/**
    + * A relation produced by applying `func` to each partition of the `child`, concatenating the
    + * resulting columns at the end of the input row. tEncoder/uEncoder are used respectively to
    --- End diff --
    
    update javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171488858
  
    **[Test build #49360 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49360/consoleFull)** for PR 10747 at commit [`ee7f3c6`](https://github.com/apache/spark/commit/ee7f3c64e3a163e142ab55c28f435db595e34746).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171486123
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49761722
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, GenerateUnsafeProjection, GenerateUnsafeRowJoiner}
    +import org.apache.spark.sql.catalyst.plans.physical._
    +import org.apache.spark.sql.types.ObjectType
    +
    +/**
    + * Helper functions for physical operators that work with user defined objects.
    + */
    +trait ObjectOperator extends SparkPlan {
    +  def generateToObject(objExpr: Expression, inputSchema: Seq[Attribute]): InternalRow => Any = {
    +    val objectProjection = GenerateSafeProjection.generate(objExpr :: Nil, inputSchema)
    +    (i: InternalRow) => objectProjection(i).get(0, objExpr.dataType)
    +  }
    +
    +  def generateToRow(serializer: Seq[Expression]): Any => InternalRow = {
    +    val outputProjection = if (serializer.head.dataType.isInstanceOf[ObjectType]) {
    +      GenerateSafeProjection.generate(serializer)
    +    } else {
    +      GenerateUnsafeProjection.generate(serializer)
    +    }
    +    val inputType = serializer.head.collect { case b: BoundReference => b.dataType }.head
    +    val outputRow = new SpecificMutableRow(inputType :: Nil)
    +    (o: Any) => {
    +      outputRow(0) = o
    +      outputProjection(outputRow)
    +    }
    +  }
    +}
    +
    +/**
    + * Applies the given function to each input row and encodes the result.
    + */
    +case class MapPartitions(
    +    func: Iterator[Any] => Iterator[Any],
    +    input: Expression,
    +    serializer: Seq[NamedExpression],
    +    child: SparkPlan) extends UnaryNode with ObjectOperator {
    +  override def output: Seq[Attribute] = serializer.map(_.toAttribute)
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    child.execute().mapPartitionsInternal { iter =>
    +      val getObject = generateToObject(input, child.output)
    +      val outputObject = generateToRow(serializer)
    +      func(iter.map(getObject)).map(outputObject)
    +    }
    +  }
    +}
    +
    +/**
    + * Applies the given function to each input row, appending the encoded result at the end of the row.
    + */
    +case class AppendColumns(
    +    func: Any => Any,
    +    input: Expression,
    +    serializer: Seq[NamedExpression],
    +    child: SparkPlan) extends UnaryNode with ObjectOperator {
    +
    +  override def output: Seq[Attribute] = child.output ++ serializer.map(_.toAttribute)
    +
    +  private def newColumnSchema = serializer.map(_.toAttribute).toStructType
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    child.execute().mapPartitionsInternal { iter =>
    +      val getObject = generateToObject(input, child.output)
    +      val combiner = GenerateUnsafeRowJoiner.create(child.schema, newColumnSchema)
    +      val outputObject = generateToRow(serializer)
    +
    +      iter.map { row =>
    +        val newColumns = outputObject(func(getObject(row)))
    --- End diff --
    
    how do we ensure `newColumns` is `UnsafeRow`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49807910
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala ---
    @@ -31,7 +31,7 @@ import org.apache.spark.sql.types._
     case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean)
       extends LeafExpression with NamedExpression {
    --- End diff --
    
    unrelated question: why `BoundReference` extends `NamedExpression`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171826585
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171487163
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171486125
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49356/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171512748
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49360/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49697555
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.optimizer
    +
    +import scala.reflect.runtime.universe.TypeTag
    +
    +import org.apache.spark.sql.catalyst.dsl.expressions._
    +import org.apache.spark.sql.catalyst.dsl.plans._
    +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    +import org.apache.spark.sql.catalyst.expressions.NewInstance
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, MapPartitions}
    +import org.apache.spark.sql.catalyst.plans.PlanTest
    +import org.apache.spark.sql.catalyst.rules.RuleExecutor
    +
    +case class OtherTuple(_1: Int, _2: Int)
    +
    +class EliminateSerializationSuite extends PlanTest {
    +  private object Optimize extends RuleExecutor[LogicalPlan] {
    +    val batches =
    +      Batch("Serialization", FixedPoint(100),
    +        EliminateSerialization) :: Nil
    +  }
    +
    +  implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]()
    +  private val func = identity[Iterator[(Int, Int)]] _
    +  private val func2 = identity[Iterator[OtherTuple]] _
    +
    +  def assertObjectCreations(count: Int, plan: LogicalPlan): Unit = {
    +    val newInstances = plan.flatMap(_.expressions.collect {
    +      case n: NewInstance => n
    +    })
    +
    +    if (newInstances.size != count) {
    +      fail(
    +        s"""
    +           |Wrong number of object creations in plan: ${newInstances.size} != $count
    +           |$plan
    +         """.stripMargin)
    +    }
    +  }
    +
    +  test("back to back MapPartitions") {
    +    val input = LocalRelation('_1.int, '_2.int)
    +    val plan =
    +      MapPartitions(func,
    --- End diff --
    
    should have a test case that tests a plan that cannot be eliminated?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49765260
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -336,12 +336,7 @@ class Dataset[T] private[sql](
       def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
         new Dataset[U](
           sqlContext,
    -      MapPartitions[T, U](
    -        func,
    -        resolvedTEncoder,
    -        encoderFor[U],
    -        encoderFor[U].schema.toAttributes,
    -        logicalPlan))
    +      MapPartitions[T, U](func, logicalPlan))
    --- End diff --
    
    This is just pushing the lifecycle of the encoder into the analyzer / physical operators where it belongs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49777362
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.plans.logical
    +
    +import org.apache.spark.sql.Encoder
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.types.ObjectType
    +
    +/**
    + * A trait for logical operators that operate on user defined objects.
    + */
    +trait ObjectOperator extends LogicalPlan {
    +  def serializer: Seq[NamedExpression]
    +
    +  def outputObject: NamedExpression =
    +    Alias(serializer.head.collect { case b: BoundReference => b }.head, "obj")()
    +
    +  def withObjectOutput: LogicalPlan = if (output.head.dataType.isInstanceOf[ObjectType]) {
    +    this
    --- End diff --
    
    I'll add some comments to this trait.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171732889
  
    **[Test build #49404 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49404/consoleFull)** for PR 10747 at commit [`ecde6e5`](https://github.com/apache/spark/commit/ecde6e580934cc835ac10ba1c63adf025073b858).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171767074
  
    **[Test build #49404 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49404/consoleFull)** for PR 10747 at commit [`ecde6e5`](https://github.com/apache/spark/commit/ecde6e580934cc835ac10ba1c63adf025073b858).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49784357
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.plans.logical
    +
    +import org.apache.spark.sql.Encoder
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.types.ObjectType
    +
    +/**
    + * A trait for logical operators that operate on user defined objects.
    + */
    +trait ObjectOperator extends LogicalPlan {
    +  def serializer: Seq[NamedExpression]
    +
    +  def outputObject: NamedExpression =
    +    Alias(serializer.head.collect { case b: BoundReference => b }.head, "obj")()
    +
    +  def withObjectOutput: LogicalPlan = if (output.head.dataType.isInstanceOf[ObjectType]) {
    +    this
    +  } else {
    +    withNewSerializer(outputObject)
    +  }
    +
    +  def withNewSerializer(newSerializer: NamedExpression): LogicalPlan = makeCopy {
    +    productIterator.map {
    +      case c if c == serializer => newSerializer :: Nil
    +      case other: AnyRef => other
    +    }.toArray
    +  }
    +}
    +
    +object MapPartitions {
    +  def apply[T : Encoder, U : Encoder](
    +      func: Iterator[T] => Iterator[U],
    +      child: LogicalPlan): MapPartitions = {
    +    MapPartitions(
    +      func.asInstanceOf[Iterator[Any] => Iterator[Any]],
    +      encoderFor[T].fromRowExpression,
    +      encoderFor[U].namedExpressions,
    +      child)
    +  }
    +}
    +
    +/**
    + * A relation produced by applying `func` to each partition of the `child`.
    + * @param input used to extract the input to `func` from an input row.
    + * @param serializer use to serialize the output of `func`.
    + */
    +case class MapPartitions(
    +    func: Iterator[Any] => Iterator[Any],
    +    input: Expression,
    +    serializer: Seq[NamedExpression],
    +    child: LogicalPlan) extends UnaryNode with ObjectOperator {
    +  override def output: Seq[Attribute] = serializer.map(_.toAttribute)
    --- End diff --
    
    can we just use `serializer.map(_.toAttribute.newInstance)` here? then we don't need to add `NamedExpression.newInstance`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49809056
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala ---
    @@ -31,7 +31,7 @@ import org.apache.spark.sql.types._
     case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean)
       extends LeafExpression with NamedExpression {
    --- End diff --
    
    Its kinda of a hack, but sometimes after transforms we end up with BoundReferences in the place of fields that were `AttributeReference` and so there were class cast exceptions.  We might be able to remove this some day or now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49777638
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.plans.logical
    +
    +import org.apache.spark.sql.Encoder
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.types.ObjectType
    +
    +/**
    + * A trait for logical operators that operate on user defined objects.
    + */
    +trait ObjectOperator extends LogicalPlan {
    +  def serializer: Seq[NamedExpression]
    +
    +  def outputObject: NamedExpression =
    +    Alias(serializer.head.collect { case b: BoundReference => b }.head, "obj")()
    +
    +  def withObjectOutput: LogicalPlan = if (output.head.dataType.isInstanceOf[ObjectType]) {
    +    this
    --- End diff --
    
    ah got it, after back-to-back optimization, we may produce object in `serializer`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49744149
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.optimizer
    +
    +import scala.reflect.runtime.universe.TypeTag
    +
    +import org.apache.spark.sql.catalyst.dsl.expressions._
    +import org.apache.spark.sql.catalyst.dsl.plans._
    +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    +import org.apache.spark.sql.catalyst.expressions.NewInstance
    +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, MapPartitions}
    +import org.apache.spark.sql.catalyst.plans.PlanTest
    +import org.apache.spark.sql.catalyst.rules.RuleExecutor
    +
    +case class OtherTuple(_1: Int, _2: Int)
    +
    +class EliminateSerializationSuite extends PlanTest {
    +  private object Optimize extends RuleExecutor[LogicalPlan] {
    +    val batches =
    +      Batch("Serialization", FixedPoint(100),
    +        EliminateSerialization) :: Nil
    +  }
    +
    +  implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]()
    +  private val func = identity[Iterator[(Int, Int)]] _
    +  private val func2 = identity[Iterator[OtherTuple]] _
    +
    +  def assertObjectCreations(count: Int, plan: LogicalPlan): Unit = {
    +    val newInstances = plan.flatMap(_.expressions.collect {
    +      case n: NewInstance => n
    +    })
    +
    +    if (newInstances.size != count) {
    +      fail(
    +        s"""
    +           |Wrong number of object creations in plan: ${newInstances.size} != $count
    +           |$plan
    +         """.stripMargin)
    +    }
    +  }
    +
    +  test("back to back MapPartitions") {
    +    val input = LocalRelation('_1.int, '_2.int)
    +    val plan =
    +      MapPartitions(func,
    --- End diff --
    
    Oh yeah, I guess I forgot to push it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171480459
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171479750
  
    **[Test build #49353 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/49353/consoleFull)** for PR 10747 at commit [`4615c96`](https://github.com/apache/spark/commit/4615c9614a2a63dbb716b97093ec83801ebdeefd).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171826592
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49417/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49777327
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.plans.logical
    +
    +import org.apache.spark.sql.Encoder
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.types.ObjectType
    +
    +/**
    + * A trait for logical operators that operate on user defined objects.
    + */
    +trait ObjectOperator extends LogicalPlan {
    +  def serializer: Seq[NamedExpression]
    +
    +  def outputObject: NamedExpression =
    +    Alias(serializer.head.collect { case b: BoundReference => b }.head, "obj")()
    +
    +  def withObjectOutput: LogicalPlan = if (output.head.dataType.isInstanceOf[ObjectType]) {
    +    this
    --- End diff --
    
    When the output is already in the form of an object (instead of a serialized row)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49766613
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, GenerateUnsafeProjection, GenerateUnsafeRowJoiner}
    +import org.apache.spark.sql.catalyst.plans.physical._
    +import org.apache.spark.sql.types.ObjectType
    +
    +/**
    + * Helper functions for physical operators that work with user defined objects.
    + */
    +trait ObjectOperator extends SparkPlan {
    +  def generateToObject(objExpr: Expression, inputSchema: Seq[Attribute]): InternalRow => Any = {
    +    val objectProjection = GenerateSafeProjection.generate(objExpr :: Nil, inputSchema)
    +    (i: InternalRow) => objectProjection(i).get(0, objExpr.dataType)
    +  }
    +
    +  def generateToRow(serializer: Seq[Expression]): Any => InternalRow = {
    +    val outputProjection = if (serializer.head.dataType.isInstanceOf[ObjectType]) {
    +      GenerateSafeProjection.generate(serializer)
    +    } else {
    +      GenerateUnsafeProjection.generate(serializer)
    +    }
    +    val inputType = serializer.head.collect { case b: BoundReference => b.dataType }.head
    +    val outputRow = new SpecificMutableRow(inputType :: Nil)
    +    (o: Any) => {
    +      outputRow(0) = o
    +      outputProjection(outputRow)
    +    }
    +  }
    +}
    +
    +/**
    + * Applies the given function to each input row and encodes the result.
    + */
    +case class MapPartitions(
    +    func: Iterator[Any] => Iterator[Any],
    +    input: Expression,
    +    serializer: Seq[NamedExpression],
    +    child: SparkPlan) extends UnaryNode with ObjectOperator {
    +  override def output: Seq[Attribute] = serializer.map(_.toAttribute)
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    child.execute().mapPartitionsInternal { iter =>
    +      val getObject = generateToObject(input, child.output)
    +      val outputObject = generateToRow(serializer)
    +      func(iter.map(getObject)).map(outputObject)
    +    }
    +  }
    +}
    +
    +/**
    + * Applies the given function to each input row, appending the encoded result at the end of the row.
    + */
    +case class AppendColumns(
    +    func: Any => Any,
    +    input: Expression,
    +    serializer: Seq[NamedExpression],
    +    child: SparkPlan) extends UnaryNode with ObjectOperator {
    +
    +  override def output: Seq[Attribute] = child.output ++ serializer.map(_.toAttribute)
    +
    +  private def newColumnSchema = serializer.map(_.toAttribute).toStructType
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    child.execute().mapPartitionsInternal { iter =>
    +      val getObject = generateToObject(input, child.output)
    +      val combiner = GenerateUnsafeRowJoiner.create(child.schema, newColumnSchema)
    +      val outputObject = generateToRow(serializer)
    +
    +      iter.map { row =>
    +        val newColumns = outputObject(func(getObject(row)))
    --- End diff --
    
    Yeah, thats a good point.  It is only safe because we only use append columns to feed into aggregation, thus we'll only ever need an unsafe row.  We could make this more general, but I'm not sure its worth it given that this will likely get rewritten in the near future for codegen anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171480462
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49353/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171767434
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/49404/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10747#discussion_r49763739
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -336,12 +336,7 @@ class Dataset[T] private[sql](
       def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
         new Dataset[U](
           sqlContext,
    -      MapPartitions[T, U](
    -        func,
    -        resolvedTEncoder,
    -        encoderFor[U],
    -        encoderFor[U].schema.toAttributes,
    -        logicalPlan))
    +      MapPartitions[T, U](func, logicalPlan))
    --- End diff --
    
    This is different from the previous one, we only pass the type parameter `T` to `MapPartitions` and build a new encoder there which is unresolved, while before this PR we pass a `resolvedTEncoder`. Do we break the life cycle of encoder in this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171767431
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171831996
  
    Thanks for reviewing!  Merging to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12813][SQL] Eliminate serialization for...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/10747#issuecomment-171476290
  
    /cc @cloud-fan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org