You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by marmbrus <gi...@git.apache.org> on 2015/10/21 03:14:50 UTC

[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

GitHub user marmbrus opened a pull request:

    https://github.com/apache/spark/pull/9190

    [SPARK-11116] [SQL] First Draft of Dataset API

    *This PR adds a new experimental API to Spark, tentitively named Datasets.*
    
    A `Dataset` is a strongly typed collection of objects that can be transformed in parallel using functional or relational operations.  Example usage is as follows:
    
    ### Functional
    ```scala
    > val ds: Dataset[Int] = Seq(1, 2, 3).toDS()
    > ds.filter(_ % 1 == 0).collect()
    res1: Array[Int] = Array(1, 2, 3)
    ```
    
    ### Relational
    ```scala
    scala> ds.toDF().show()
    +-----+
    |value|
    +-----+
    |    1|
    |    2|
    |    3|
    +-----+
    
    > ds.select(expr("value + 1").as[Int]).collect()
    res11: Array[Int] = Array(2, 3, 4)
    ```
    
    ## Comparison to RDDs
     A `Dataset` differs from an `RDD` in the following ways:
      - The creation of a `Dataset` requires the presence of an explicit `Encoder` that can be
        used to serialize the object into a binary format.  Encoders are also capable of mapping the
        schema of a given object to the Spark SQL type system.  In contrast, RDDs rely on runtime
        reflection based serialization.
      - Internally, a `Dataset` is represented by a Catalyst logical plan and the data is stored
        in the encoded form.  This representation allows for additional logical operations and
        enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
        an object.
    
    A `Dataset` can be converted to an `RDD` by calling the `.rdd` method.
    
    ## Comparison to DataFrames
    
    A `Dataset` can be thought of as a specialized DataFrame, where the elements map to a specific
    JVM object type, instead of to a generic `Row` container. A DataFrame can be transformed into
    specific Dataset by calling `df.as[ElementType]`.  Similarly you can transform a strongly-typed
    `Dataset` to a generic DataFrame by calling `ds.toDF()`.
    
    ## Implementation Status and TODOs
    
    This is a rough cut at the least controversial parts of the API.  The primary purpose here is to get something committed so that we can better parallelize further work and get early feedback on the API.  The following is being deferred to future PRs:
     - Joins and Aggregations (prototype here https://github.com/apache/spark/commit/f11f91e6f08c8cf389b8388b626cd29eec32d937)
     - Support for Java
    
    Additionally, the responsibility for binding an encoder to a given schema is currently done in a fairly ad-hoc fashion.  This is an internal detail, and what we are doing today works for the cases we care about.  However, as we add more APIs we'll probably need to do this in a more principled way (i.e. separate resolution from binding as we do in DataFrames).
    
    ## COMPATIBILITY NOTE
    Long term we plan to make `DataFrame` extend `Dataset[Row]`.  However,
    making this change to che class hierarchy would break the function signatures for the existing
    function operations (map, flatMap, etc).  As such, this class should be considered a preview
    of the final API.  Changes will be made to the interface after Spark 1.6.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/marmbrus/spark dataset-infra

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/9190.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #9190
    
----
commit 0db295c3491b2344213647c91b7ecae2763c58ac
Author: Michael Armbrust <mi...@databricks.com>
Date:   2015-10-20T21:18:53Z

    [SPARK-11116] [SQL] First Draft of Dataset API

commit f11f91e6f08c8cf389b8388b626cd29eec32d937
Author: Michael Armbrust <mi...@databricks.com>
Date:   2015-10-20T21:38:58Z

    delete controversial bits

commit e32885f50462c3c66ae049c7cf0d69e90ecbedbe
Author: Michael Armbrust <mi...@databricks.com>
Date:   2015-10-20T23:02:28Z

    style

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r43113163
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateOrdering}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, Ascending, Expression}
    +
    +object GroupedIterator {
    +  def apply(
    +      input: Iterator[InternalRow],
    +      keyExpressions: Seq[Expression],
    +      inputSchema: Seq[Attribute]): Iterator[(InternalRow, Iterator[InternalRow])] = {
    +    if (input.hasNext) {
    +      new GroupedIterator(input, keyExpressions, inputSchema)
    +    } else {
    +      Iterator.empty
    +    }
    +  }
    +}
    +
    +/**
    + * Iterates over a presorted set of rows, chunking it up by the grouping expression.  Each call to
    + * next will return a pair containing the current group and an iterator that will return all the
    + * elements of that group.  Iterators for each group are lazily constructed by extracting rows
    + * from the input iterator.  As such, full groups are never materialized by this class.
    + *
    + * Example input:
    + * {{{
    + *   Input: [a, 1], [b, 2], [b, 3]
    + *   Grouping: x#1
    + *   InputSchema: x#1, y#2
    + * }}}
    + *
    + * Result:
    + * {{{
    + *   First call to next():  ([a], Iterator([a, 1])
    + *   Second call to next(): ([b], Iterator([b, 2], [b, 3])
    + * }}}
    + *
    + * Note, the class does not handle the case of an empty input for simplicity of implementation.
    + * Use the factory to construct a new instance.
    + *
    + * @param input An iterator of rows.  This iterator must be ordered by the groupingExpressions or
    + *              it is possible for the same group to appear more than once.
    + * @param groupingExpressions The set of expressions used to do grouping.  The result of evaluating
    + *                            these expressions will be returned as the first part of each call
    + *                            to `next()`.
    + * @param inputSchema The schema of the rows in the `input` iterator.
    + */
    +class GroupedIterator private(
    +    input: Iterator[InternalRow],
    +    groupingExpressions: Seq[Expression],
    +    inputSchema: Seq[Attribute])
    +  extends Iterator[(InternalRow, Iterator[InternalRow])] {
    +
    +  /** Compares two input rows and returns 0 if they are in the same group. */
    +  val sortOrder = groupingExpressions.map(SortOrder(_, Ascending))
    +  val keyOrdering = GenerateOrdering.generate(sortOrder, inputSchema)
    +
    +  /** Creates a row containing only the key for a given input row. */
    +  val keyProjection = GenerateUnsafeProjection.generate(groupingExpressions, inputSchema)
    +
    +  /**
    +   * Holds null or the row that will be returned on next call to `next()` in the inner iterator.
    +   */
    +  var currentRow = input.next()
    +
    +  /** Holds a copy of an input row that is in the current group. */
    +  var currentGroup = currentRow.copy()
    +  var currentIterator: Iterator[InternalRow] = null
    +  assert(keyOrdering.compare(currentGroup, currentRow) == 0)
    +
    +  // Return true if we already have the next iterator or fetching a new iterator is successful.
    +  def hasNext: Boolean = currentIterator != null || fetchNextGroupIterator
    +
    +  def next(): (InternalRow, Iterator[InternalRow]) = {
    +    assert(hasNext) // Ensure we have fetched the next iterator.
    +    val ret = (keyProjection(currentGroup), currentIterator)
    +    currentIterator = null
    +    ret
    +  }
    +
    +  def fetchNextGroupIterator(): Boolean = {
    +    if (currentRow != null || input.hasNext) {
    +      val inputIterator = new Iterator[InternalRow] {
    --- End diff --
    
    Thats a good catch.  We can just advance the internal iterator all the way to the next group if they call `next` on the outer iterator before exhausting the inner iterator (and add documentation that you can't use two inner iterators concurrently).
    
    Could you open a PR to unit test this class and fix the problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r43120465
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateOrdering}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, Ascending, Expression}
    +
    +object GroupedIterator {
    +  def apply(
    +      input: Iterator[InternalRow],
    +      keyExpressions: Seq[Expression],
    +      inputSchema: Seq[Attribute]): Iterator[(InternalRow, Iterator[InternalRow])] = {
    +    if (input.hasNext) {
    +      new GroupedIterator(input, keyExpressions, inputSchema)
    +    } else {
    +      Iterator.empty
    +    }
    +  }
    +}
    +
    +/**
    + * Iterates over a presorted set of rows, chunking it up by the grouping expression.  Each call to
    + * next will return a pair containing the current group and an iterator that will return all the
    + * elements of that group.  Iterators for each group are lazily constructed by extracting rows
    + * from the input iterator.  As such, full groups are never materialized by this class.
    + *
    + * Example input:
    + * {{{
    + *   Input: [a, 1], [b, 2], [b, 3]
    + *   Grouping: x#1
    + *   InputSchema: x#1, y#2
    + * }}}
    + *
    + * Result:
    + * {{{
    + *   First call to next():  ([a], Iterator([a, 1])
    + *   Second call to next(): ([b], Iterator([b, 2], [b, 3])
    + * }}}
    + *
    + * Note, the class does not handle the case of an empty input for simplicity of implementation.
    + * Use the factory to construct a new instance.
    + *
    + * @param input An iterator of rows.  This iterator must be ordered by the groupingExpressions or
    + *              it is possible for the same group to appear more than once.
    + * @param groupingExpressions The set of expressions used to do grouping.  The result of evaluating
    + *                            these expressions will be returned as the first part of each call
    + *                            to `next()`.
    + * @param inputSchema The schema of the rows in the `input` iterator.
    + */
    +class GroupedIterator private(
    +    input: Iterator[InternalRow],
    +    groupingExpressions: Seq[Expression],
    +    inputSchema: Seq[Attribute])
    +  extends Iterator[(InternalRow, Iterator[InternalRow])] {
    +
    +  /** Compares two input rows and returns 0 if they are in the same group. */
    +  val sortOrder = groupingExpressions.map(SortOrder(_, Ascending))
    +  val keyOrdering = GenerateOrdering.generate(sortOrder, inputSchema)
    +
    +  /** Creates a row containing only the key for a given input row. */
    +  val keyProjection = GenerateUnsafeProjection.generate(groupingExpressions, inputSchema)
    +
    +  /**
    +   * Holds null or the row that will be returned on next call to `next()` in the inner iterator.
    +   */
    +  var currentRow = input.next()
    +
    +  /** Holds a copy of an input row that is in the current group. */
    +  var currentGroup = currentRow.copy()
    +  var currentIterator: Iterator[InternalRow] = null
    +  assert(keyOrdering.compare(currentGroup, currentRow) == 0)
    +
    +  // Return true if we already have the next iterator or fetching a new iterator is successful.
    +  def hasNext: Boolean = currentIterator != null || fetchNextGroupIterator
    +
    +  def next(): (InternalRow, Iterator[InternalRow]) = {
    +    assert(hasNext) // Ensure we have fetched the next iterator.
    +    val ret = (keyProjection(currentGroup), currentIterator)
    +    currentIterator = null
    +    ret
    +  }
    +
    +  def fetchNextGroupIterator(): Boolean = {
    +    if (currentRow != null || input.hasNext) {
    +      val inputIterator = new Iterator[InternalRow] {
    --- End diff --
    
    yea, sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-150338785
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44162/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42702980
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala ---
    @@ -46,13 +47,27 @@ trait Encoder[T] {
     
       /**
        * Returns an object of type `T`, extracting the required values from the provided row.  Note that
    -   * you must bind the encoder to a specific schema before you can call this function.
    +   * you must `bind` an encoder to a specific schema before you can call this function.
        */
       def fromRow(row: InternalRow): T
     
       /**
        * Returns a new copy of this encoder, where the expressions used by `fromRow` are bound to the
    -   * given schema
    +   * given schema.
        */
       def bind(schema: Seq[Attribute]): Encoder[T]
    --- End diff --
    
    It would be nice if these were separate from the process of actually encoding stuff. Otherwise users that want to make custom encoders will have to do lots of work. It's not super clear at a glance what each of these APIs are for and when each will recalled (i.e. bind vs bindOrdinals vs rebind).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by binarybana <gi...@git.apache.org>.
Github user binarybana commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r43416475
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala ---
    @@ -0,0 +1,30 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain a copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.spark.sql
    +
    +/**
    + * A container for a [[DataFrame]], used for implicit conversions.
    + *
    + * @since 1.3.0
    + */
    +private[sql] case class DatasetHolder[T](df: Dataset[T]) {
    +
    +  // This is declared with parentheses to prevent the Scala compiler from treating
    +  // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
    --- End diff --
    
    There are a few references to DataFrame and 1.3.0 here that should probably be updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-149752240
  
    **[Test build #44026 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44026/consoleFull)** for PR 9190 at commit [`e32885f`](https://github.com/apache/spark/commit/e32885f50462c3c66ae049c7cf0d69e90ecbedbe).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-149859825
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44056/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42701186
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
    @@ -387,6 +389,10 @@ class SQLContext private[sql](
           def $(args: Any*): ColumnName = {
             new ColumnName(sc.s(args: _*))
           }
    +
    +      def e[T : Encoder](args: Any*): TypedColumn[T] = {
    --- End diff --
    
    what is this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42970431
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.QueryExecution
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel
    + * using functional or relational operations.
    + *
    + * A [[Dataset]] differs from an [[RDD]] in the following ways:
    + *  - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored
    + *    in the encoded form.  This representation allows for additional logical operations and
    + *    enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
    + *    an object.
    + *  - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be
    + *    used to serialize the object into a binary format.  Encoders are also capable of mapping the
    + *    schema of a given object to the Spark SQL type system.  In contrast, RDDs rely on runtime
    + *    reflection based serialization. Operations that change the type of object stored in the
    + *    dataset also need an encoder for the new type.
    + *
    + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific
    + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into
    + * specific Dataset by calling `df.as[ElementType]`.  Similarly you can transform a strongly-typed
    + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`.
    + *
    + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`.  However,
    + * making this change to the class hierarchy would break the function signatures for the existing
    + * functional operations (map, flatMap, etc).  As such, this class should be considered a preview
    + * of the final API.  Changes will be made to the interface after Spark 1.6.
    + *
    + * @since 1.6.0
    + */
    +@Experimental
    +class Dataset[T] private[sql](
    +    @transient val sqlContext: SQLContext,
    +    @transient val queryExecution: QueryExecution)(
    --- End diff --
    
    No, I was just trying to match the constructor of DataFrame, but we could probably simplify this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-150013834
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42702153
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---
    @@ -17,6 +17,8 @@
     
     package org.apache.spark.sql
     
    +import org.apache.spark.sql.catalyst.encoders.Encoder
    --- End diff --
    
    import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42702955
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -0,0 +1,398 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.QueryExecution
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel
    + * using functional or relational operations.
    + *
    + * A [[Dataset]] differs from an [[RDD]] in the following ways:
    + *  - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored
    + *    in the encoded form.  This representation allows for additional logical operations and
    + *    enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
    + *    an object.
    + *  - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be
    + *    used to serialize the object into a binary format.  Encoders are also capable of mapping the
    + *    schema of a given object to the Spark SQL type system.  In contrast, RDDs rely on runtime
    + *    reflection based serialization. Operations that change the type of object stored in the
    + *    dataset also need an encoder for the new type.
    + *
    + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific
    + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into
    + * specific Dataset by calling `df.as[ElementType]`.  Similarly you can transform a strongly-typed
    + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`.
    + *
    + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`.  However,
    + * making this change to che class hierarchy would break the function signatures for the existing
    + * functional operations (map, flatMap, etc).  As such, this class should be considered a preview
    + * of the final API.  Changes will be made to the interface after Spark 1.6.
    + *
    + * @since 1.6.0
    + */
    +@Experimental
    +class Dataset[T] private[sql](
    +    @transient val sqlContext: SQLContext,
    +    @transient val queryExecution: QueryExecution)(
    +    implicit val encoder: Encoder[T]) extends Serializable {
    +
    +  private implicit def classTag = encoder.clsTag
    +
    +  private[sql] def this(sqlContext: SQLContext, plan: LogicalPlan)(implicit encoder: Encoder[T]) =
    +    this(sqlContext, new QueryExecution(sqlContext, plan))
    +
    +  /** Returns the schema of the encoded form of the objects in this [[Dataset]]. */
    +  def schema: StructType = encoder.schema
    +
    +  /* ************* *
    +   *  Conversions  *
    +   * ************* */
    +
    +  /**
    +   * Returns a new `Dataset` where each record has been mapped on to the specified type.
    +   * TODO: should bind here...
    +   * TODO: document binding rules
    +   * @since 1.6.0
    +   */
    +  def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, queryExecution)(implicitly[Encoder[U]])
    +
    +  /**
    +   * Applies a logical alias to this [[Dataset]] that can be used to disambiguate columns that have
    +   * the same name after two Datasets have been joined.
    +   */
    +  def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _))
    +
    +  /**
    +   * Converts this strongly typed collection of data to generic Dataframe.  In contrast to the
    +   * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]]
    +   * objects that allow fields to be accessed by ordinal or name.
    +   */
    +  def toDF(): DataFrame = DataFrame(sqlContext, logicalPlan)
    +
    +
    +  /**
    +   * Returns this Dataset.
    +   * @since 1.6.0
    +   */
    +  def toDS(): Dataset[T] = this
    +
    +  /**
    +   * Converts this Dataset to an RDD.
    +   * @since 1.6.0
    +   */
    +  def rdd: RDD[T] = {
    +    val tEnc = implicitly[Encoder[T]]
    +    val input = queryExecution.analyzed.output
    +    queryExecution.toRdd.mapPartitions { iter =>
    +      val bound = tEnc.bind(input)
    +      iter.map(bound.fromRow)
    +    }
    +  }
    +
    +  /* *********************** *
    +   *  Functional Operations  *
    +   * *********************** */
    +
    +  /**
    +   * Concise syntax for chaining custom transformations.
    +   * {{{
    +   *   def featurize(ds: Dataset[T]) = ...
    +   *
    +   *   dataset
    +   *     .transform(featurize)
    +   *     .transform(...)
    +   * }}}
    +   *
    +   * @since 1.6.0
    +   */
    +  def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` returns `true`.
    +   * @since 1.6.0
    +   */
    +  def filter(func: T => Boolean): Dataset[T] = mapPartitions(_.filter(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` returns `false`.
    +   * @since 1.6.0
    +   */
    +  def filterNot(func: T => Boolean): Dataset[T] = mapPartitions(_.filterNot(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that contains the result of applying `func` to each element.
    +   * @since 1.6.0
    +   */
    +  def map[U : Encoder](func: T => U): Dataset[U] = mapPartitions(_.map(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that contains the result of applying `func` to each element.
    +   * @since 1.6.0
    +   */
    +  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
    +    new Dataset(
    +      sqlContext,
    +      MapPartitions[T, U](
    +        func,
    +        implicitly[Encoder[T]],
    +        implicitly[Encoder[U]],
    +        implicitly[Encoder[U]].schema.toAttributes,
    +        logicalPlan))
    +  }
    +
    +  def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] =
    +    mapPartitions(_.flatMap(func))
    +
    +  /* ************** *
    +   *  Side effects  *
    +   * ************** */
    +
    +  /**
    +   * Runs `func` on each element of this Dataset.
    +   * @since 1.6.0
    +   */
    +  def foreach(func: T => Unit): Unit = rdd.foreach(func)
    +
    +  /**
    +   * Runs `func` on each partition of this Dataset.
    +   * @since 1.6.0
    +   */
    +  def foreachPartition(func: Iterator[T] => Unit): Unit = rdd.foreachPartition(func)
    +
    +  /* ************* *
    +   *  Aggregation  *
    +   * ************* */
    +
    +  /**
    +   * Reduces the elements of this Dataset using the specified  binary function.  The given function
    +   * must be commutative and associative or the result may be non-deterministic.
    +   * @since 1.6.0
    +   */
    +  def reduce(func: (T, T) => T): T = rdd.reduce(func)
    +
    +  /**
    +   * Aggregates the elements of each partition, and then the results for all the partitions, using a
    +   * given associative and commutative function and a neutral "zero value".
    +   *
    +   * This behaves somewhat differently than the fold operations implemented for non-distributed
    +   * collections in functional languages like Scala. This fold operation may be applied to
    +   * partitions individually, and then those results will be folded into the final result.
    +   * If op is not commutative, then the result may differ from that of a fold applied to a
    +   * non-distributed collection.
    +   * @since 1.6.0
    +   */
    +  def fold(zeroValue: T)(op: (T, T) => T): T = rdd.fold(zeroValue)(op)
    +
    +  /**
    +   * Returns a [[GroupedDataset]] where the data is grouped by the given key function.
    +   * @since 1.6.0
    +   */
    +  def groupBy[K : Encoder](func: T => K): GroupedDataset[K, T] = {
    +    val inputPlan = queryExecution.analyzed
    +    val withGroupingKey = AppendColumn(func, inputPlan)
    +    val executed = sqlContext.executePlan(withGroupingKey)
    +
    +    new GroupedDataset(
    +      implicitly[Encoder[K]].bindOrdinals(withGroupingKey.newColumns),
    +      implicitly[Encoder[T]].bind(inputPlan.output),
    +      executed,
    +      inputPlan.output,
    +      withGroupingKey.newColumns)
    +  }
    +
    +  /* ****************** *
    +   *  Typed Relational  *
    +   * ****************** */
    +
    +  /**
    +   * Returns a new [[Dataset]] by computing the given [[Column]] expression for each element.
    +   *
    +   * {{{
    +   *   val ds = Seq(1, 2, 3).toDS()
    +   *   val newDS = ds.select(e[Int]("value + 1"))
    --- End diff --
    
    need to update the example


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-149770955
  
    **[Test build #44026 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44026/consoleFull)** for PR 9190 at commit [`e32885f`](https://github.com/apache/spark/commit/e32885f50462c3c66ae049c7cf0d69e90ecbedbe).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `case class LongEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Long] `\n  * `case class IntEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Int] `\n  * `case class StringEncoder(`\n  * `          |class Tuple$`\n  * `class Tuple2Encoder[T1, T2](e1: Encoder[T1], e2: Encoder[T2]) extends Encoder[(T1, T2)] `\n  * `class Tuple3Encoder[T1, T2, T3](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3]) extends Encoder[(T1, T2, T3)] `\n  * `class Tuple4Encoder[T1, T2, T3, T4](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4]) extends Encoder[(T1, T2, T3, T4)] `\n  * `class Tuple5Encoder[T1, T2, T3, T4, T5](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4], e5: Encoder[T5]) extends Encoder[(T1, T2, T3, T4, T5)] `\n  * `  implicit class AttributeSeq(attrs: Seq[Attribute]) `\n  * `case class MapPartitions[T, U](`\n  * `case class AppendColum
 n[T, U](`\n  * `case class MapGroups[K, T, U](`\n  * `class TypedColumn[T](expr: Expression) extends Column(expr)`\n  * `case class MapPartitions[T, U](`\n  * `case class AppendColumns[T, U](`\n  * `case class MapGroups[K, T, U](`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42701821
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala ---
    @@ -33,6 +37,16 @@ import org.apache.spark.unsafe.types.UTF8String
     private[sql] abstract class SQLImplicits {
    --- End diff --
    
    not this pr, but SQLImplicits should be public


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42703289
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala ---
    @@ -0,0 +1,68 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.encoders.Encoder
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.QueryExecution
    +
    +/**
    + * A [[Dataset]] has been logically grouped by a user specified grouping key.  Users should not
    + * construct a [[GroupedDataset]] directly, but should instead call `groupBy` on an existing
    + * [[Dataset]].
    + */
    +class GroupedDataset[K, T] private[sql](
    +    private val kEncoder: Encoder[K],
    +    private val tEncoder: Encoder[T],
    +    queryExecution: QueryExecution,
    +    private val dataAttributes: Seq[Attribute],
    +    private val groupingAttributes: Seq[Attribute]) extends Serializable {
    +
    +  private implicit def kEnc = kEncoder
    +  private implicit def tEnc = tEncoder
    +  private def logicalPlan = queryExecution.analyzed
    +  private def sqlContext = queryExecution.sqlContext
    +
    +  /**
    +   * Returns a [[Dataset]] that contains each unique key.
    +   */
    +  def keys: Dataset[K] = {
    --- End diff --
    
    why do we need this? since it is just 
    
    map(_...).distinct


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-149828370
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-149771016
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44026/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-149859719
  
    **[Test build #44056 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44056/consoleFull)** for PR 9190 at commit [`d6ac1f8`](https://github.com/apache/spark/commit/d6ac1f81fdbab36fc64aaae47203dba07334df14).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `case class LongEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Long] `\n  * `case class IntEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Int] `\n  * `case class StringEncoder(`\n  * `          |class Tuple$`\n  * `class Tuple2Encoder[T1, T2](e1: Encoder[T1], e2: Encoder[T2]) extends Encoder[(T1, T2)] `\n  * `class Tuple3Encoder[T1, T2, T3](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3]) extends Encoder[(T1, T2, T3)] `\n  * `class Tuple4Encoder[T1, T2, T3, T4](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4]) extends Encoder[(T1, T2, T3, T4)] `\n  * `class Tuple5Encoder[T1, T2, T3, T4, T5](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4], e5: Encoder[T5]) extends Encoder[(T1, T2, T3, T4, T5)] `\n  * `  implicit class AttributeSeq(attrs: Seq[Attribute]) `\n  * `case class MapPartitions[T, U](`\n  * `case class AppendColum
 n[T, U](`\n  * `case class MapGroups[K, T, U](`\n  * `class TypedColumn[T](expr: Expression)(implicit val encoder: Encoder[T]) extends Column(expr)`\n  * `case class MapPartitions[T, U](`\n  * `case class AppendColumns[T, U](`\n  * `case class MapGroups[K, T, U](`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-150338653
  
    **[Test build #44162 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44162/consoleFull)** for PR 9190 at commit [`e251f87`](https://github.com/apache/spark/commit/e251f87a830df9f40d523f00bbe14c174c91b2ae).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `case class LongEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Long] `\n  * `case class IntEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Int] `\n  * `case class StringEncoder(`\n  * `          |class Tuple$`\n  * `class Tuple2Encoder[T1, T2](e1: Encoder[T1], e2: Encoder[T2]) extends Encoder[(T1, T2)] `\n  * `class Tuple3Encoder[T1, T2, T3](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3]) extends Encoder[(T1, T2, T3)] `\n  * `class Tuple4Encoder[T1, T2, T3, T4](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4]) extends Encoder[(T1, T2, T3, T4)] `\n  * `class Tuple5Encoder[T1, T2, T3, T4, T5](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4], e5: Encoder[T5]) extends Encoder[(T1, T2, T3, T4, T5)] `\n  * `  implicit class AttributeSeq(attrs: Seq[Attribute]) `\n  * `case class MapPartitions[T, U](`\n  * `case class AppendColum
 n[T, U](`\n  * `case class MapGroups[K, T, U](`\n  * `class TypedColumn[T](expr: Expression)(implicit val encoder: Encoder[T]) extends Column(expr)`\n  * `abstract class SQLImplicits `\n  * `case class MapPartitions[T, U](`\n  * `case class AppendColumns[T, U](`\n  * `case class MapGroups[K, T, U](`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42702965
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -0,0 +1,398 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.QueryExecution
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel
    + * using functional or relational operations.
    + *
    + * A [[Dataset]] differs from an [[RDD]] in the following ways:
    + *  - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored
    + *    in the encoded form.  This representation allows for additional logical operations and
    + *    enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
    + *    an object.
    + *  - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be
    + *    used to serialize the object into a binary format.  Encoders are also capable of mapping the
    + *    schema of a given object to the Spark SQL type system.  In contrast, RDDs rely on runtime
    + *    reflection based serialization. Operations that change the type of object stored in the
    + *    dataset also need an encoder for the new type.
    + *
    + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific
    + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into
    + * specific Dataset by calling `df.as[ElementType]`.  Similarly you can transform a strongly-typed
    + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`.
    + *
    + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`.  However,
    + * making this change to che class hierarchy would break the function signatures for the existing
    + * functional operations (map, flatMap, etc).  As such, this class should be considered a preview
    + * of the final API.  Changes will be made to the interface after Spark 1.6.
    + *
    + * @since 1.6.0
    + */
    +@Experimental
    +class Dataset[T] private[sql](
    +    @transient val sqlContext: SQLContext,
    +    @transient val queryExecution: QueryExecution)(
    +    implicit val encoder: Encoder[T]) extends Serializable {
    +
    +  private implicit def classTag = encoder.clsTag
    +
    +  private[sql] def this(sqlContext: SQLContext, plan: LogicalPlan)(implicit encoder: Encoder[T]) =
    +    this(sqlContext, new QueryExecution(sqlContext, plan))
    +
    +  /** Returns the schema of the encoded form of the objects in this [[Dataset]]. */
    +  def schema: StructType = encoder.schema
    +
    +  /* ************* *
    +   *  Conversions  *
    +   * ************* */
    +
    +  /**
    +   * Returns a new `Dataset` where each record has been mapped on to the specified type.
    +   * TODO: should bind here...
    +   * TODO: document binding rules
    +   * @since 1.6.0
    +   */
    +  def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, queryExecution)(implicitly[Encoder[U]])
    +
    +  /**
    +   * Applies a logical alias to this [[Dataset]] that can be used to disambiguate columns that have
    +   * the same name after two Datasets have been joined.
    +   */
    +  def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _))
    +
    +  /**
    +   * Converts this strongly typed collection of data to generic Dataframe.  In contrast to the
    +   * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]]
    +   * objects that allow fields to be accessed by ordinal or name.
    +   */
    +  def toDF(): DataFrame = DataFrame(sqlContext, logicalPlan)
    +
    +
    +  /**
    +   * Returns this Dataset.
    +   * @since 1.6.0
    +   */
    +  def toDS(): Dataset[T] = this
    +
    +  /**
    +   * Converts this Dataset to an RDD.
    +   * @since 1.6.0
    +   */
    +  def rdd: RDD[T] = {
    +    val tEnc = implicitly[Encoder[T]]
    +    val input = queryExecution.analyzed.output
    +    queryExecution.toRdd.mapPartitions { iter =>
    +      val bound = tEnc.bind(input)
    +      iter.map(bound.fromRow)
    +    }
    +  }
    +
    +  /* *********************** *
    +   *  Functional Operations  *
    +   * *********************** */
    +
    +  /**
    +   * Concise syntax for chaining custom transformations.
    +   * {{{
    +   *   def featurize(ds: Dataset[T]) = ...
    +   *
    +   *   dataset
    +   *     .transform(featurize)
    +   *     .transform(...)
    +   * }}}
    +   *
    +   * @since 1.6.0
    +   */
    +  def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` returns `true`.
    +   * @since 1.6.0
    +   */
    +  def filter(func: T => Boolean): Dataset[T] = mapPartitions(_.filter(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` returns `false`.
    +   * @since 1.6.0
    +   */
    +  def filterNot(func: T => Boolean): Dataset[T] = mapPartitions(_.filterNot(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that contains the result of applying `func` to each element.
    +   * @since 1.6.0
    +   */
    +  def map[U : Encoder](func: T => U): Dataset[U] = mapPartitions(_.map(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that contains the result of applying `func` to each element.
    +   * @since 1.6.0
    +   */
    +  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
    +    new Dataset(
    +      sqlContext,
    +      MapPartitions[T, U](
    +        func,
    +        implicitly[Encoder[T]],
    +        implicitly[Encoder[U]],
    +        implicitly[Encoder[U]].schema.toAttributes,
    +        logicalPlan))
    +  }
    +
    +  def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] =
    +    mapPartitions(_.flatMap(func))
    +
    +  /* ************** *
    +   *  Side effects  *
    +   * ************** */
    +
    +  /**
    +   * Runs `func` on each element of this Dataset.
    +   * @since 1.6.0
    +   */
    +  def foreach(func: T => Unit): Unit = rdd.foreach(func)
    +
    +  /**
    +   * Runs `func` on each partition of this Dataset.
    +   * @since 1.6.0
    +   */
    +  def foreachPartition(func: Iterator[T] => Unit): Unit = rdd.foreachPartition(func)
    +
    +  /* ************* *
    +   *  Aggregation  *
    +   * ************* */
    +
    +  /**
    +   * Reduces the elements of this Dataset using the specified  binary function.  The given function
    +   * must be commutative and associative or the result may be non-deterministic.
    +   * @since 1.6.0
    +   */
    +  def reduce(func: (T, T) => T): T = rdd.reduce(func)
    +
    +  /**
    +   * Aggregates the elements of each partition, and then the results for all the partitions, using a
    +   * given associative and commutative function and a neutral "zero value".
    +   *
    +   * This behaves somewhat differently than the fold operations implemented for non-distributed
    +   * collections in functional languages like Scala. This fold operation may be applied to
    +   * partitions individually, and then those results will be folded into the final result.
    +   * If op is not commutative, then the result may differ from that of a fold applied to a
    +   * non-distributed collection.
    +   * @since 1.6.0
    +   */
    +  def fold(zeroValue: T)(op: (T, T) => T): T = rdd.fold(zeroValue)(op)
    +
    +  /**
    +   * Returns a [[GroupedDataset]] where the data is grouped by the given key function.
    +   * @since 1.6.0
    +   */
    +  def groupBy[K : Encoder](func: T => K): GroupedDataset[K, T] = {
    +    val inputPlan = queryExecution.analyzed
    +    val withGroupingKey = AppendColumn(func, inputPlan)
    +    val executed = sqlContext.executePlan(withGroupingKey)
    +
    +    new GroupedDataset(
    +      implicitly[Encoder[K]].bindOrdinals(withGroupingKey.newColumns),
    +      implicitly[Encoder[T]].bind(inputPlan.output),
    +      executed,
    +      inputPlan.output,
    +      withGroupingKey.newColumns)
    +  }
    +
    +  /* ****************** *
    +   *  Typed Relational  *
    +   * ****************** */
    +
    +  /**
    +   * Returns a new [[Dataset]] by computing the given [[Column]] expression for each element.
    +   *
    +   * {{{
    +   *   val ds = Seq(1, 2, 3).toDS()
    +   *   val newDS = ds.select(e[Int]("value + 1"))
    +   * }}}
    +   * @since 1.6.0
    +   */
    +  def select[U1: Encoder](c1: TypedColumn[U1]): Dataset[U1] = {
    +    new Dataset[U1](sqlContext, Project(Alias(c1.expr, "_1")() :: Nil, logicalPlan))
    +  }
    +
    +  // Codegen
    +  // scalastyle:off
    --- End diff --
    
    why is this off?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42780810
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -0,0 +1,398 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.QueryExecution
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel
    + * using functional or relational operations.
    + *
    + * A [[Dataset]] differs from an [[RDD]] in the following ways:
    + *  - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored
    + *    in the encoded form.  This representation allows for additional logical operations and
    + *    enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
    + *    an object.
    + *  - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be
    + *    used to serialize the object into a binary format.  Encoders are also capable of mapping the
    + *    schema of a given object to the Spark SQL type system.  In contrast, RDDs rely on runtime
    + *    reflection based serialization. Operations that change the type of object stored in the
    + *    dataset also need an encoder for the new type.
    + *
    + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific
    + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into
    + * specific Dataset by calling `df.as[ElementType]`.  Similarly you can transform a strongly-typed
    + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`.
    + *
    + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`.  However,
    + * making this change to che class hierarchy would break the function signatures for the existing
    + * functional operations (map, flatMap, etc).  As such, this class should be considered a preview
    + * of the final API.  Changes will be made to the interface after Spark 1.6.
    + *
    + * @since 1.6.0
    + */
    +@Experimental
    +class Dataset[T] private[sql](
    +    @transient val sqlContext: SQLContext,
    +    @transient val queryExecution: QueryExecution)(
    +    implicit val encoder: Encoder[T]) extends Serializable {
    +
    +  private implicit def classTag = encoder.clsTag
    +
    +  private[sql] def this(sqlContext: SQLContext, plan: LogicalPlan)(implicit encoder: Encoder[T]) =
    +    this(sqlContext, new QueryExecution(sqlContext, plan))
    +
    +  /** Returns the schema of the encoded form of the objects in this [[Dataset]]. */
    +  def schema: StructType = encoder.schema
    +
    +  /* ************* *
    +   *  Conversions  *
    +   * ************* */
    +
    +  /**
    +   * Returns a new `Dataset` where each record has been mapped on to the specified type.
    +   * TODO: should bind here...
    +   * TODO: document binding rules
    +   * @since 1.6.0
    +   */
    +  def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, queryExecution)(implicitly[Encoder[U]])
    +
    +  /**
    +   * Applies a logical alias to this [[Dataset]] that can be used to disambiguate columns that have
    +   * the same name after two Datasets have been joined.
    +   */
    +  def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _))
    +
    +  /**
    +   * Converts this strongly typed collection of data to generic Dataframe.  In contrast to the
    +   * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]]
    +   * objects that allow fields to be accessed by ordinal or name.
    +   */
    +  def toDF(): DataFrame = DataFrame(sqlContext, logicalPlan)
    +
    +
    +  /**
    +   * Returns this Dataset.
    +   * @since 1.6.0
    +   */
    +  def toDS(): Dataset[T] = this
    +
    +  /**
    +   * Converts this Dataset to an RDD.
    +   * @since 1.6.0
    +   */
    +  def rdd: RDD[T] = {
    +    val tEnc = implicitly[Encoder[T]]
    +    val input = queryExecution.analyzed.output
    +    queryExecution.toRdd.mapPartitions { iter =>
    +      val bound = tEnc.bind(input)
    +      iter.map(bound.fromRow)
    +    }
    +  }
    +
    +  /* *********************** *
    +   *  Functional Operations  *
    +   * *********************** */
    +
    +  /**
    +   * Concise syntax for chaining custom transformations.
    +   * {{{
    +   *   def featurize(ds: Dataset[T]) = ...
    +   *
    +   *   dataset
    +   *     .transform(featurize)
    +   *     .transform(...)
    +   * }}}
    +   *
    +   * @since 1.6.0
    +   */
    +  def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` returns `true`.
    +   * @since 1.6.0
    +   */
    +  def filter(func: T => Boolean): Dataset[T] = mapPartitions(_.filter(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` returns `false`.
    +   * @since 1.6.0
    +   */
    +  def filterNot(func: T => Boolean): Dataset[T] = mapPartitions(_.filterNot(func))
    --- End diff --
    
    Scaling has it, scala collections has it.  I guess I don't see the point of deviating from these APIs that people might already know when is cheap / easy to add more functionality.  I've removed it for now though as its easy to add back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-149859824
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-149829999
  
    **[Test build #44056 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44056/consoleFull)** for PR 9190 at commit [`d6ac1f8`](https://github.com/apache/spark/commit/d6ac1f81fdbab36fc64aaae47203dba07334df14).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-150013863
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42777771
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -0,0 +1,398 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.QueryExecution
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel
    + * using functional or relational operations.
    + *
    + * A [[Dataset]] differs from an [[RDD]] in the following ways:
    + *  - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored
    + *    in the encoded form.  This representation allows for additional logical operations and
    + *    enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
    + *    an object.
    + *  - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be
    + *    used to serialize the object into a binary format.  Encoders are also capable of mapping the
    + *    schema of a given object to the Spark SQL type system.  In contrast, RDDs rely on runtime
    + *    reflection based serialization. Operations that change the type of object stored in the
    + *    dataset also need an encoder for the new type.
    + *
    + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific
    + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into
    + * specific Dataset by calling `df.as[ElementType]`.  Similarly you can transform a strongly-typed
    + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`.
    + *
    + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`.  However,
    + * making this change to che class hierarchy would break the function signatures for the existing
    + * functional operations (map, flatMap, etc).  As such, this class should be considered a preview
    + * of the final API.  Changes will be made to the interface after Spark 1.6.
    + *
    + * @since 1.6.0
    + */
    +@Experimental
    +class Dataset[T] private[sql](
    +    @transient val sqlContext: SQLContext,
    +    @transient val queryExecution: QueryExecution)(
    +    implicit val encoder: Encoder[T]) extends Serializable {
    +
    +  private implicit def classTag = encoder.clsTag
    +
    +  private[sql] def this(sqlContext: SQLContext, plan: LogicalPlan)(implicit encoder: Encoder[T]) =
    +    this(sqlContext, new QueryExecution(sqlContext, plan))
    +
    +  /** Returns the schema of the encoded form of the objects in this [[Dataset]]. */
    +  def schema: StructType = encoder.schema
    +
    +  /* ************* *
    +   *  Conversions  *
    +   * ************* */
    +
    +  /**
    +   * Returns a new `Dataset` where each record has been mapped on to the specified type.
    +   * TODO: should bind here...
    +   * TODO: document binding rules
    +   * @since 1.6.0
    +   */
    +  def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, queryExecution)(implicitly[Encoder[U]])
    +
    +  /**
    +   * Applies a logical alias to this [[Dataset]] that can be used to disambiguate columns that have
    +   * the same name after two Datasets have been joined.
    +   */
    +  def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _))
    +
    +  /**
    +   * Converts this strongly typed collection of data to generic Dataframe.  In contrast to the
    +   * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]]
    +   * objects that allow fields to be accessed by ordinal or name.
    +   */
    +  def toDF(): DataFrame = DataFrame(sqlContext, logicalPlan)
    +
    +
    +  /**
    +   * Returns this Dataset.
    +   * @since 1.6.0
    +   */
    +  def toDS(): Dataset[T] = this
    +
    +  /**
    +   * Converts this Dataset to an RDD.
    +   * @since 1.6.0
    +   */
    +  def rdd: RDD[T] = {
    +    val tEnc = implicitly[Encoder[T]]
    +    val input = queryExecution.analyzed.output
    +    queryExecution.toRdd.mapPartitions { iter =>
    +      val bound = tEnc.bind(input)
    +      iter.map(bound.fromRow)
    +    }
    +  }
    +
    +  /* *********************** *
    +   *  Functional Operations  *
    +   * *********************** */
    +
    +  /**
    +   * Concise syntax for chaining custom transformations.
    +   * {{{
    +   *   def featurize(ds: Dataset[T]) = ...
    +   *
    +   *   dataset
    +   *     .transform(featurize)
    +   *     .transform(...)
    +   * }}}
    +   *
    +   * @since 1.6.0
    +   */
    +  def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` returns `true`.
    +   * @since 1.6.0
    +   */
    +  def filter(func: T => Boolean): Dataset[T] = mapPartitions(_.filter(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` returns `false`.
    +   * @since 1.6.0
    +   */
    +  def filterNot(func: T => Boolean): Dataset[T] = mapPartitions(_.filterNot(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that contains the result of applying `func` to each element.
    +   * @since 1.6.0
    +   */
    +  def map[U : Encoder](func: T => U): Dataset[U] = mapPartitions(_.map(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that contains the result of applying `func` to each element.
    +   * @since 1.6.0
    +   */
    +  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
    +    new Dataset(
    +      sqlContext,
    +      MapPartitions[T, U](
    +        func,
    +        implicitly[Encoder[T]],
    +        implicitly[Encoder[U]],
    +        implicitly[Encoder[U]].schema.toAttributes,
    +        logicalPlan))
    +  }
    +
    +  def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] =
    +    mapPartitions(_.flatMap(func))
    +
    +  /* ************** *
    +   *  Side effects  *
    +   * ************** */
    +
    +  /**
    +   * Runs `func` on each element of this Dataset.
    +   * @since 1.6.0
    +   */
    +  def foreach(func: T => Unit): Unit = rdd.foreach(func)
    +
    +  /**
    +   * Runs `func` on each partition of this Dataset.
    +   * @since 1.6.0
    +   */
    +  def foreachPartition(func: Iterator[T] => Unit): Unit = rdd.foreachPartition(func)
    +
    +  /* ************* *
    +   *  Aggregation  *
    +   * ************* */
    +
    +  /**
    +   * Reduces the elements of this Dataset using the specified  binary function.  The given function
    +   * must be commutative and associative or the result may be non-deterministic.
    +   * @since 1.6.0
    +   */
    +  def reduce(func: (T, T) => T): T = rdd.reduce(func)
    +
    +  /**
    +   * Aggregates the elements of each partition, and then the results for all the partitions, using a
    +   * given associative and commutative function and a neutral "zero value".
    +   *
    +   * This behaves somewhat differently than the fold operations implemented for non-distributed
    +   * collections in functional languages like Scala. This fold operation may be applied to
    +   * partitions individually, and then those results will be folded into the final result.
    +   * If op is not commutative, then the result may differ from that of a fold applied to a
    +   * non-distributed collection.
    +   * @since 1.6.0
    +   */
    +  def fold(zeroValue: T)(op: (T, T) => T): T = rdd.fold(zeroValue)(op)
    +
    +  /**
    +   * Returns a [[GroupedDataset]] where the data is grouped by the given key function.
    +   * @since 1.6.0
    +   */
    +  def groupBy[K : Encoder](func: T => K): GroupedDataset[K, T] = {
    --- End diff --
    
    Yeah, this was blocked on #9212, but I'll add that in a followup PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r43108962
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateOrdering}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, Ascending, Expression}
    +
    +object GroupedIterator {
    +  def apply(
    +      input: Iterator[InternalRow],
    +      keyExpressions: Seq[Expression],
    +      inputSchema: Seq[Attribute]): Iterator[(InternalRow, Iterator[InternalRow])] = {
    +    if (input.hasNext) {
    +      new GroupedIterator(input, keyExpressions, inputSchema)
    +    } else {
    +      Iterator.empty
    +    }
    +  }
    +}
    +
    +/**
    + * Iterates over a presorted set of rows, chunking it up by the grouping expression.  Each call to
    + * next will return a pair containing the current group and an iterator that will return all the
    + * elements of that group.  Iterators for each group are lazily constructed by extracting rows
    + * from the input iterator.  As such, full groups are never materialized by this class.
    + *
    + * Example input:
    + * {{{
    + *   Input: [a, 1], [b, 2], [b, 3]
    + *   Grouping: x#1
    + *   InputSchema: x#1, y#2
    + * }}}
    + *
    + * Result:
    + * {{{
    + *   First call to next():  ([a], Iterator([a, 1])
    + *   Second call to next(): ([b], Iterator([b, 2], [b, 3])
    + * }}}
    + *
    + * Note, the class does not handle the case of an empty input for simplicity of implementation.
    + * Use the factory to construct a new instance.
    + *
    + * @param input An iterator of rows.  This iterator must be ordered by the groupingExpressions or
    + *              it is possible for the same group to appear more than once.
    + * @param groupingExpressions The set of expressions used to do grouping.  The result of evaluating
    + *                            these expressions will be returned as the first part of each call
    + *                            to `next()`.
    + * @param inputSchema The schema of the rows in the `input` iterator.
    + */
    +class GroupedIterator private(
    +    input: Iterator[InternalRow],
    +    groupingExpressions: Seq[Expression],
    +    inputSchema: Seq[Attribute])
    +  extends Iterator[(InternalRow, Iterator[InternalRow])] {
    +
    +  /** Compares two input rows and returns 0 if they are in the same group. */
    +  val sortOrder = groupingExpressions.map(SortOrder(_, Ascending))
    +  val keyOrdering = GenerateOrdering.generate(sortOrder, inputSchema)
    +
    +  /** Creates a row containing only the key for a given input row. */
    +  val keyProjection = GenerateUnsafeProjection.generate(groupingExpressions, inputSchema)
    +
    +  /**
    +   * Holds null or the row that will be returned on next call to `next()` in the inner iterator.
    +   */
    +  var currentRow = input.next()
    +
    +  /** Holds a copy of an input row that is in the current group. */
    +  var currentGroup = currentRow.copy()
    +  var currentIterator: Iterator[InternalRow] = null
    +  assert(keyOrdering.compare(currentGroup, currentRow) == 0)
    +
    +  // Return true if we already have the next iterator or fetching a new iterator is successful.
    +  def hasNext: Boolean = currentIterator != null || fetchNextGroupIterator
    +
    +  def next(): (InternalRow, Iterator[InternalRow]) = {
    +    assert(hasNext) // Ensure we have fetched the next iterator.
    +    val ret = (keyProjection(currentGroup), currentIterator)
    +    currentIterator = null
    +    ret
    +  }
    +
    +  def fetchNextGroupIterator(): Boolean = {
    +    if (currentRow != null || input.hasNext) {
    +      val inputIterator = new Iterator[InternalRow] {
    --- End diff --
    
    This is dangerous, the user has to consume all data for a group before he can process next group, which is out of our control.
    Image a user call `mapGroups` but do nothing to the iterator, then this `GroupedIterator` will never end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-150044716
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44087/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42702909
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -0,0 +1,398 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.QueryExecution
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel
    + * using functional or relational operations.
    + *
    + * A [[Dataset]] differs from an [[RDD]] in the following ways:
    + *  - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored
    + *    in the encoded form.  This representation allows for additional logical operations and
    + *    enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
    + *    an object.
    + *  - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be
    + *    used to serialize the object into a binary format.  Encoders are also capable of mapping the
    + *    schema of a given object to the Spark SQL type system.  In contrast, RDDs rely on runtime
    + *    reflection based serialization. Operations that change the type of object stored in the
    + *    dataset also need an encoder for the new type.
    + *
    + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific
    + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into
    + * specific Dataset by calling `df.as[ElementType]`.  Similarly you can transform a strongly-typed
    + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`.
    + *
    + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`.  However,
    + * making this change to che class hierarchy would break the function signatures for the existing
    + * functional operations (map, flatMap, etc).  As such, this class should be considered a preview
    + * of the final API.  Changes will be made to the interface after Spark 1.6.
    + *
    + * @since 1.6.0
    + */
    +@Experimental
    +class Dataset[T] private[sql](
    +    @transient val sqlContext: SQLContext,
    +    @transient val queryExecution: QueryExecution)(
    +    implicit val encoder: Encoder[T]) extends Serializable {
    +
    +  private implicit def classTag = encoder.clsTag
    +
    +  private[sql] def this(sqlContext: SQLContext, plan: LogicalPlan)(implicit encoder: Encoder[T]) =
    +    this(sqlContext, new QueryExecution(sqlContext, plan))
    +
    +  /** Returns the schema of the encoded form of the objects in this [[Dataset]]. */
    +  def schema: StructType = encoder.schema
    +
    +  /* ************* *
    +   *  Conversions  *
    +   * ************* */
    +
    +  /**
    +   * Returns a new `Dataset` where each record has been mapped on to the specified type.
    +   * TODO: should bind here...
    +   * TODO: document binding rules
    +   * @since 1.6.0
    +   */
    +  def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, queryExecution)(implicitly[Encoder[U]])
    +
    +  /**
    +   * Applies a logical alias to this [[Dataset]] that can be used to disambiguate columns that have
    +   * the same name after two Datasets have been joined.
    +   */
    +  def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _))
    +
    +  /**
    +   * Converts this strongly typed collection of data to generic Dataframe.  In contrast to the
    +   * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]]
    +   * objects that allow fields to be accessed by ordinal or name.
    +   */
    +  def toDF(): DataFrame = DataFrame(sqlContext, logicalPlan)
    +
    +
    +  /**
    +   * Returns this Dataset.
    +   * @since 1.6.0
    +   */
    +  def toDS(): Dataset[T] = this
    +
    +  /**
    +   * Converts this Dataset to an RDD.
    +   * @since 1.6.0
    +   */
    +  def rdd: RDD[T] = {
    +    val tEnc = implicitly[Encoder[T]]
    +    val input = queryExecution.analyzed.output
    +    queryExecution.toRdd.mapPartitions { iter =>
    +      val bound = tEnc.bind(input)
    +      iter.map(bound.fromRow)
    +    }
    +  }
    +
    +  /* *********************** *
    +   *  Functional Operations  *
    +   * *********************** */
    +
    +  /**
    +   * Concise syntax for chaining custom transformations.
    +   * {{{
    +   *   def featurize(ds: Dataset[T]) = ...
    +   *
    +   *   dataset
    +   *     .transform(featurize)
    +   *     .transform(...)
    +   * }}}
    +   *
    +   * @since 1.6.0
    +   */
    +  def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` returns `true`.
    +   * @since 1.6.0
    +   */
    +  def filter(func: T => Boolean): Dataset[T] = mapPartitions(_.filter(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` returns `false`.
    +   * @since 1.6.0
    +   */
    +  def filterNot(func: T => Boolean): Dataset[T] = mapPartitions(_.filterNot(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that contains the result of applying `func` to each element.
    +   * @since 1.6.0
    +   */
    +  def map[U : Encoder](func: T => U): Dataset[U] = mapPartitions(_.map(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that contains the result of applying `func` to each element.
    +   * @since 1.6.0
    +   */
    +  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
    +    new Dataset(
    +      sqlContext,
    +      MapPartitions[T, U](
    +        func,
    +        implicitly[Encoder[T]],
    +        implicitly[Encoder[U]],
    +        implicitly[Encoder[U]].schema.toAttributes,
    +        logicalPlan))
    +  }
    +
    +  def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] =
    +    mapPartitions(_.flatMap(func))
    +
    +  /* ************** *
    +   *  Side effects  *
    +   * ************** */
    +
    +  /**
    +   * Runs `func` on each element of this Dataset.
    +   * @since 1.6.0
    +   */
    +  def foreach(func: T => Unit): Unit = rdd.foreach(func)
    +
    +  /**
    +   * Runs `func` on each partition of this Dataset.
    +   * @since 1.6.0
    +   */
    +  def foreachPartition(func: Iterator[T] => Unit): Unit = rdd.foreachPartition(func)
    +
    +  /* ************* *
    +   *  Aggregation  *
    +   * ************* */
    +
    +  /**
    +   * Reduces the elements of this Dataset using the specified  binary function.  The given function
    +   * must be commutative and associative or the result may be non-deterministic.
    +   * @since 1.6.0
    +   */
    +  def reduce(func: (T, T) => T): T = rdd.reduce(func)
    +
    +  /**
    +   * Aggregates the elements of each partition, and then the results for all the partitions, using a
    +   * given associative and commutative function and a neutral "zero value".
    +   *
    +   * This behaves somewhat differently than the fold operations implemented for non-distributed
    +   * collections in functional languages like Scala. This fold operation may be applied to
    +   * partitions individually, and then those results will be folded into the final result.
    +   * If op is not commutative, then the result may differ from that of a fold applied to a
    +   * non-distributed collection.
    +   * @since 1.6.0
    +   */
    +  def fold(zeroValue: T)(op: (T, T) => T): T = rdd.fold(zeroValue)(op)
    +
    +  /**
    +   * Returns a [[GroupedDataset]] where the data is grouped by the given key function.
    +   * @since 1.6.0
    +   */
    +  def groupBy[K : Encoder](func: T => K): GroupedDataset[K, T] = {
    --- End diff --
    
    should we have a group by that takes a column name or expression?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-149750967
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42700920
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---
    @@ -70,6 +76,13 @@ class Column(protected[sql] val expr: Expression) extends Logging {
       override def hashCode: Int = this.expr.hashCode
     
       /**
    +   * Provides a type hint about the expected return value of this column.  This information can
    +   * be used by operations such as `select` on a [[Dataset]] to automatically convert the
    +   * results into the correct JVM types.
    +   */
    +  def as[T : Encoder]: TypedColumn[T] = new TypedColumn[T](expr)
    --- End diff --
    
    add since version


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r43416866
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DatasetHolder.scala ---
    @@ -0,0 +1,30 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain a copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.spark.sql
    +
    +/**
    + * A container for a [[DataFrame]], used for implicit conversions.
    + *
    + * @since 1.3.0
    + */
    +private[sql] case class DatasetHolder[T](df: Dataset[T]) {
    +
    +  // This is declared with parentheses to prevent the Scala compiler from treating
    +  // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
    --- End diff --
    
    Thanks, already fixed here: #9358


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42703300
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala ---
    @@ -46,13 +47,27 @@ trait Encoder[T] {
     
       /**
        * Returns an object of type `T`, extracting the required values from the provided row.  Note that
    -   * you must bind the encoder to a specific schema before you can call this function.
    +   * you must `bind` an encoder to a specific schema before you can call this function.
        */
       def fromRow(row: InternalRow): T
     
       /**
        * Returns a new copy of this encoder, where the expressions used by `fromRow` are bound to the
    -   * given schema
    +   * given schema.
        */
       def bind(schema: Seq[Attribute]): Encoder[T]
    --- End diff --
    
    To simplify it, maybe we can just use "schema" to figure out the order of field names this Encoder expects, and internally project the rows we pass to it so that they're in that order. It might be somewhat less efficient though, I guess, but it would be nice if this API was closer to being open-able because some people might like to play with it in 1.6.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42575073
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---
    @@ -36,6 +36,10 @@ private[sql] object Column {
       def unapply(col: Column): Option[Expression] = Some(col.expr)
     }
     
    +/**
    + * A [[Column]] where a type hint has been given for the expected return type.
    + */
    +class TypedColumn[T](expr: Expression) extends Column(expr)
    --- End diff --
    
    should we put the encoder here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42778416
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala ---
    @@ -46,13 +47,27 @@ trait Encoder[T] {
     
       /**
        * Returns an object of type `T`, extracting the required values from the provided row.  Note that
    -   * you must bind the encoder to a specific schema before you can call this function.
    +   * you must `bind` an encoder to a specific schema before you can call this function.
        */
       def fromRow(row: InternalRow): T
     
       /**
        * Returns a new copy of this encoder, where the expressions used by `fromRow` are bound to the
    -   * given schema
    +   * given schema.
        */
       def bind(schema: Seq[Attribute]): Encoder[T]
    --- End diff --
    
    I agree that this is needs to be reworked.  In particular we should separate resolution from binding (as mentioned in the PR description).  The way we are doing it today allows us to do very efficient codegen (no extra copies) and correctly handles things like joins that produce ambiguous column names (since internally we are binding to AttributeReferences).
    
    Given limited time before 1.6 code freeze, I'd rather mark the Encoder API as private and focus on fleshing out the user facing API.  I think that long term we'll do what you suggest and have a wrapper that reorders input for custom encoder, and stick with pure expressions for the built in encoders for performance reasons.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-150304953
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42778185
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import scala.language.postfixOps
    +
    +import org.apache.spark.sql.functions._
    +import org.apache.spark.sql.test.SharedSQLContext
    +
    +case class ClassData(a: String, b: Int)
    +
    +class DatasetSuite extends QueryTest with SharedSQLContext {
    --- End diff --
    
    Totally agree.  There will be tests for all of the functions called from pure Java before 1.6 (we learned our lesson after introducing DataFrames).  As I mentioned in the description though, I'd like to merge this infrastructure and then do the Java API in a follow up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42701182
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
    @@ -21,6 +21,8 @@ import java.beans.{BeanInfo, Introspector}
     import java.util.Properties
     import java.util.concurrent.atomic.AtomicReference
     
    +import org.apache.spark.sql.catalyst.encoders.Encoder
    --- End diff --
    
    import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-150304918
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42778774
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala ---
    @@ -31,6 +31,7 @@ import org.apache.spark.sql.types.StructType
      * and reuse internal buffers to improve performance.
      */
     trait Encoder[T] {
    --- End diff --
    
    Everything in `catalyst.*` is considered unstable and is excluded from the generated documentation.  So, usually we don't mark them developer API too.
    
    However, since this is a user facing concept, we'll need to move at least `trait Encoder[T]` before the release.  I don't know where we want to put it though, `org.apache.spark.sql`? `org.apache.spark`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-149750983
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/9190


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42702840
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import scala.language.postfixOps
    +
    +import org.apache.spark.sql.functions._
    +import org.apache.spark.sql.test.SharedSQLContext
    +
    +case class ClassData(a: String, b: Int)
    +
    +class DatasetSuite extends QueryTest with SharedSQLContext {
    --- End diff --
    
    Can you also add a test suite in Java? It's useful to have Java code that calls each method in case we accidentally break some of them later. Even if it's doing the same tests, just make it a .java file and make sure we have a call to each method in Java.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-149828393
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-150338781
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-150044715
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42977432
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala ---
    @@ -0,0 +1,141 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateOrdering}
    +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, Ascending, Expression}
    +
    +object GroupedIterator {
    +  def apply(
    +      input: Iterator[InternalRow],
    +      keyExpressions: Seq[Expression],
    +      inputSchema: Seq[Attribute]): Iterator[(InternalRow, Iterator[InternalRow])] = {
    +    if (input.hasNext) {
    +      new GroupedIterator(input, keyExpressions, inputSchema)
    +    } else {
    +      Iterator.empty
    +    }
    +  }
    +}
    +
    +/**
    + * Iterates over a presorted set of rows, chunking it up by the grouping expression.  Each call to
    + * next will return a pair containing the current group and an iterator that will return all the
    + * elements of that group.  Iterators for each group are lazily constructed by extracting rows
    + * from the input iterator.  As such, full groups are never materialized by this class.
    + *
    + * Example input:
    + * {{{
    + *   Input: [a, 1], [b, 2], [b, 3]
    + *   Grouping: x#1
    + *   InputSchema: x#1, y#2
    + * }}}
    + *
    + * Result:
    + * {{{
    + *   First call to next():  ([a], Iterator([a, 1])
    + *   Second call to next(): ([b], Iterator([b, 2], [b, 3])
    + * }}}
    + *
    + * Note, the class does not handle the case of an empty input for simplicity of implementation.
    + * Use the factory to construct a new instance.
    + *
    + * @param input An iterator of rows.  This iterator must be ordered by the groupingExpressions or
    + *              it is possible for the same group to appear more than once.
    + * @param groupingExpressions The set of expressions used to do grouping.  The result of evaluating
    + *                            these expressions will be returned as the first part of each call
    + *                            to `next()`.
    + * @param inputSchema The schema of the rows in the `input` iterator.
    + */
    +class GroupedIterator private(
    +    input: Iterator[InternalRow],
    +    groupingExpressions: Seq[Expression],
    +    inputSchema: Seq[Attribute])
    +  extends Iterator[(InternalRow, Iterator[InternalRow])] {
    +
    +  /** Compares two input rows and returns 0 if they are in the same group. */
    +  val sortOrder = groupingExpressions.map(SortOrder(_, Ascending))
    +  val keyOrdering = GenerateOrdering.generate(sortOrder, inputSchema)
    +
    +  /** Creates a row containing only the key for a given input row. */
    +  val keyProjection = GenerateUnsafeProjection.generate(groupingExpressions, inputSchema)
    +
    +  /**
    +   * Holds null or the row that will be returned on next call to `next()` in the inner iterator.
    +   */
    +  var currentRow = input.next()
    +
    +  /** Holds a copy of an input row that is in the current group. */
    +  var currentGroup = currentRow.copy()
    +  var currentIterator: Iterator[InternalRow] = null
    +  assert(keyOrdering.compare(currentGroup, currentRow) == 0)
    +
    +  // Return true if we already have the next iterator or fetching a new iterator is successful.
    +  def hasNext: Boolean = currentIterator != null || fetchNextGroupIterator
    +
    +  def next(): (InternalRow, Iterator[InternalRow]) = {
    --- End diff --
    
    What if the user call `next` twice? We will return different result and break the semantic of `Iterator`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42960084
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/tuples.scala ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.encoders
    +
    +
    +import scala.reflect.ClassTag
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.types.{StructField, StructType}
    +
    +// Most of this file is codegen.
    +// scalastyle:off
    +
    +/**
    + * A set of composite encoders that take sub encoders and map each of their objects to a
    + * Scala tuple.  Note that currently the implementation is fairly limited and only supports going
    + * from an internal row to a tuple.
    + */
    +object TupleEncoder {
    --- End diff --
    
    `Tuple` is an implementation of `Product`, isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-150305742
  
    Hey Guys, thanks for looking this over!  I think I addressed most of your comments, but let me know if you want to talk about anything further.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-149771015
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42960493
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.QueryExecution
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel
    + * using functional or relational operations.
    + *
    + * A [[Dataset]] differs from an [[RDD]] in the following ways:
    + *  - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored
    + *    in the encoded form.  This representation allows for additional logical operations and
    + *    enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
    + *    an object.
    + *  - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be
    + *    used to serialize the object into a binary format.  Encoders are also capable of mapping the
    + *    schema of a given object to the Spark SQL type system.  In contrast, RDDs rely on runtime
    + *    reflection based serialization. Operations that change the type of object stored in the
    + *    dataset also need an encoder for the new type.
    + *
    + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific
    + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into
    + * specific Dataset by calling `df.as[ElementType]`.  Similarly you can transform a strongly-typed
    + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`.
    + *
    + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`.  However,
    + * making this change to the class hierarchy would break the function signatures for the existing
    + * functional operations (map, flatMap, etc).  As such, this class should be considered a preview
    + * of the final API.  Changes will be made to the interface after Spark 1.6.
    + *
    + * @since 1.6.0
    + */
    +@Experimental
    +class Dataset[T] private[sql](
    +    @transient val sqlContext: SQLContext,
    +    @transient val queryExecution: QueryExecution)(
    --- End diff --
    
    Will `sqlContext` and `queryExecution.sqlContext` different?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42701621
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/tuples.scala ---
    @@ -0,0 +1,175 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.encoders
    +
    +
    +import scala.reflect.ClassTag
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.types.{StructField, StructType}
    +
    +// Most of this file is codegen.
    +// scalastyle:off
    +
    +/**
    + * A set of composite encoders that take sub encoders and map each of their objects to a
    + * Scala tuple.  Note that currently the implementation is fairly limited and only supports going
    + * from an internal row to a tuple.
    + *
    + * The input is assumed to be
    --- End diff --
    
    this seems incomplete?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-150305783
  
    **[Test build #44162 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44162/consoleFull)** for PR 9190 at commit [`e251f87`](https://github.com/apache/spark/commit/e251f87a830df9f40d523f00bbe14c174c91b2ae).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-150044350
  
    **[Test build #44087 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44087/consoleFull)** for PR 9190 at commit [`b172d78`](https://github.com/apache/spark/commit/b172d789e1deee51c43fa35835d4a64736d4859e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `case class LongEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Long] `\n  * `case class IntEncoder(fieldName: String = \"value\", ordinal: Int = 0) extends Encoder[Int] `\n  * `case class StringEncoder(`\n  * `          |class Tuple$`\n  * `class Tuple2Encoder[T1, T2](e1: Encoder[T1], e2: Encoder[T2]) extends Encoder[(T1, T2)] `\n  * `class Tuple3Encoder[T1, T2, T3](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3]) extends Encoder[(T1, T2, T3)] `\n  * `class Tuple4Encoder[T1, T2, T3, T4](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4]) extends Encoder[(T1, T2, T3, T4)] `\n  * `class Tuple5Encoder[T1, T2, T3, T4, T5](e1: Encoder[T1], e2: Encoder[T2], e3: Encoder[T3], e4: Encoder[T4], e5: Encoder[T5]) extends Encoder[(T1, T2, T3, T4, T5)] `\n  * `  implicit class AttributeSeq(attrs: Seq[Attribute]) `\n  * `case class MapPartitions[T, U](`\n  * `case class AppendColum
 n[T, U](`\n  * `case class MapGroups[K, T, U](`\n  * `class TypedColumn[T](expr: Expression)(implicit val encoder: Encoder[T]) extends Column(expr)`\n  * `case class MapPartitions[T, U](`\n  * `case class AppendColumns[T, U](`\n  * `case class MapGroups[K, T, U](`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42702730
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -0,0 +1,398 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.QueryExecution
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel
    + * using functional or relational operations.
    + *
    + * A [[Dataset]] differs from an [[RDD]] in the following ways:
    + *  - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored
    + *    in the encoded form.  This representation allows for additional logical operations and
    + *    enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
    + *    an object.
    + *  - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be
    + *    used to serialize the object into a binary format.  Encoders are also capable of mapping the
    + *    schema of a given object to the Spark SQL type system.  In contrast, RDDs rely on runtime
    + *    reflection based serialization. Operations that change the type of object stored in the
    + *    dataset also need an encoder for the new type.
    + *
    + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific
    + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into
    + * specific Dataset by calling `df.as[ElementType]`.  Similarly you can transform a strongly-typed
    + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`.
    + *
    + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`.  However,
    + * making this change to che class hierarchy would break the function signatures for the existing
    + * functional operations (map, flatMap, etc).  As such, this class should be considered a preview
    + * of the final API.  Changes will be made to the interface after Spark 1.6.
    + *
    + * @since 1.6.0
    + */
    +@Experimental
    +class Dataset[T] private[sql](
    +    @transient val sqlContext: SQLContext,
    +    @transient val queryExecution: QueryExecution)(
    +    implicit val encoder: Encoder[T]) extends Serializable {
    +
    +  private implicit def classTag = encoder.clsTag
    +
    +  private[sql] def this(sqlContext: SQLContext, plan: LogicalPlan)(implicit encoder: Encoder[T]) =
    +    this(sqlContext, new QueryExecution(sqlContext, plan))
    +
    +  /** Returns the schema of the encoded form of the objects in this [[Dataset]]. */
    +  def schema: StructType = encoder.schema
    +
    +  /* ************* *
    +   *  Conversions  *
    +   * ************* */
    +
    +  /**
    +   * Returns a new `Dataset` where each record has been mapped on to the specified type.
    +   * TODO: should bind here...
    +   * TODO: document binding rules
    +   * @since 1.6.0
    +   */
    +  def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, queryExecution)(implicitly[Encoder[U]])
    +
    +  /**
    +   * Applies a logical alias to this [[Dataset]] that can be used to disambiguate columns that have
    +   * the same name after two Datasets have been joined.
    +   */
    +  def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _))
    +
    +  /**
    +   * Converts this strongly typed collection of data to generic Dataframe.  In contrast to the
    +   * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]]
    +   * objects that allow fields to be accessed by ordinal or name.
    +   */
    +  def toDF(): DataFrame = DataFrame(sqlContext, logicalPlan)
    +
    +
    +  /**
    +   * Returns this Dataset.
    +   * @since 1.6.0
    +   */
    +  def toDS(): Dataset[T] = this
    +
    +  /**
    +   * Converts this Dataset to an RDD.
    +   * @since 1.6.0
    +   */
    +  def rdd: RDD[T] = {
    +    val tEnc = implicitly[Encoder[T]]
    +    val input = queryExecution.analyzed.output
    +    queryExecution.toRdd.mapPartitions { iter =>
    +      val bound = tEnc.bind(input)
    +      iter.map(bound.fromRow)
    +    }
    +  }
    +
    +  /* *********************** *
    +   *  Functional Operations  *
    +   * *********************** */
    +
    +  /**
    +   * Concise syntax for chaining custom transformations.
    +   * {{{
    +   *   def featurize(ds: Dataset[T]) = ...
    +   *
    +   *   dataset
    +   *     .transform(featurize)
    +   *     .transform(...)
    +   * }}}
    +   *
    +   * @since 1.6.0
    +   */
    +  def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` returns `true`.
    +   * @since 1.6.0
    +   */
    +  def filter(func: T => Boolean): Dataset[T] = mapPartitions(_.filter(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` returns `false`.
    +   * @since 1.6.0
    +   */
    +  def filterNot(func: T => Boolean): Dataset[T] = mapPartitions(_.filterNot(func))
    --- End diff --
    
    do we need this?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42700923
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---
    @@ -36,6 +38,10 @@ private[sql] object Column {
       def unapply(col: Column): Option[Expression] = Some(col.expr)
     }
     
    +/**
    + * A [[Column]] where an [[Encoder]] has been given for the expected return type.
    + */
    +class TypedColumn[T](expr: Expression)(implicit val encoder: Encoder[T]) extends Column(expr)
    --- End diff --
    
    add since version


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42701532
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
    @@ -17,6 +17,8 @@
     
     package org.apache.spark.sql
     
    +import org.apache.spark.sql.catalyst.encoders.Encoder
    --- End diff --
    
    remove?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-150014839
  
    **[Test build #44087 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44087/consoleFull)** for PR 9190 at commit [`b172d78`](https://github.com/apache/spark/commit/b172d789e1deee51c43fa35835d4a64736d4859e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42777859
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -0,0 +1,398 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.QueryExecution
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel
    + * using functional or relational operations.
    + *
    + * A [[Dataset]] differs from an [[RDD]] in the following ways:
    + *  - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored
    + *    in the encoded form.  This representation allows for additional logical operations and
    + *    enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
    + *    an object.
    + *  - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be
    + *    used to serialize the object into a binary format.  Encoders are also capable of mapping the
    + *    schema of a given object to the Spark SQL type system.  In contrast, RDDs rely on runtime
    + *    reflection based serialization. Operations that change the type of object stored in the
    + *    dataset also need an encoder for the new type.
    + *
    + * A [[Dataset]] can be thought of as a specialized DataFrame, where the elements map to a specific
    + * JVM object type, instead of to a generic [[Row]] container. A DataFrame can be transformed into
    + * specific Dataset by calling `df.as[ElementType]`.  Similarly you can transform a strongly-typed
    + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`.
    + *
    + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend `Dataset[Row]`.  However,
    + * making this change to che class hierarchy would break the function signatures for the existing
    + * functional operations (map, flatMap, etc).  As such, this class should be considered a preview
    + * of the final API.  Changes will be made to the interface after Spark 1.6.
    + *
    + * @since 1.6.0
    + */
    +@Experimental
    +class Dataset[T] private[sql](
    +    @transient val sqlContext: SQLContext,
    +    @transient val queryExecution: QueryExecution)(
    +    implicit val encoder: Encoder[T]) extends Serializable {
    +
    +  private implicit def classTag = encoder.clsTag
    +
    +  private[sql] def this(sqlContext: SQLContext, plan: LogicalPlan)(implicit encoder: Encoder[T]) =
    +    this(sqlContext, new QueryExecution(sqlContext, plan))
    +
    +  /** Returns the schema of the encoded form of the objects in this [[Dataset]]. */
    +  def schema: StructType = encoder.schema
    +
    +  /* ************* *
    +   *  Conversions  *
    +   * ************* */
    +
    +  /**
    +   * Returns a new `Dataset` where each record has been mapped on to the specified type.
    +   * TODO: should bind here...
    +   * TODO: document binding rules
    +   * @since 1.6.0
    +   */
    +  def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, queryExecution)(implicitly[Encoder[U]])
    +
    +  /**
    +   * Applies a logical alias to this [[Dataset]] that can be used to disambiguate columns that have
    +   * the same name after two Datasets have been joined.
    +   */
    +  def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _))
    +
    +  /**
    +   * Converts this strongly typed collection of data to generic Dataframe.  In contrast to the
    +   * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]]
    +   * objects that allow fields to be accessed by ordinal or name.
    +   */
    +  def toDF(): DataFrame = DataFrame(sqlContext, logicalPlan)
    +
    +
    +  /**
    +   * Returns this Dataset.
    +   * @since 1.6.0
    +   */
    +  def toDS(): Dataset[T] = this
    +
    +  /**
    +   * Converts this Dataset to an RDD.
    +   * @since 1.6.0
    +   */
    +  def rdd: RDD[T] = {
    +    val tEnc = implicitly[Encoder[T]]
    +    val input = queryExecution.analyzed.output
    +    queryExecution.toRdd.mapPartitions { iter =>
    +      val bound = tEnc.bind(input)
    +      iter.map(bound.fromRow)
    +    }
    +  }
    +
    +  /* *********************** *
    +   *  Functional Operations  *
    +   * *********************** */
    +
    +  /**
    +   * Concise syntax for chaining custom transformations.
    +   * {{{
    +   *   def featurize(ds: Dataset[T]) = ...
    +   *
    +   *   dataset
    +   *     .transform(featurize)
    +   *     .transform(...)
    +   * }}}
    +   *
    +   * @since 1.6.0
    +   */
    +  def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` returns `true`.
    +   * @since 1.6.0
    +   */
    +  def filter(func: T => Boolean): Dataset[T] = mapPartitions(_.filter(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` returns `false`.
    +   * @since 1.6.0
    +   */
    +  def filterNot(func: T => Boolean): Dataset[T] = mapPartitions(_.filterNot(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that contains the result of applying `func` to each element.
    +   * @since 1.6.0
    +   */
    +  def map[U : Encoder](func: T => U): Dataset[U] = mapPartitions(_.map(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that contains the result of applying `func` to each element.
    +   * @since 1.6.0
    +   */
    +  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
    +    new Dataset(
    +      sqlContext,
    +      MapPartitions[T, U](
    +        func,
    +        implicitly[Encoder[T]],
    +        implicitly[Encoder[U]],
    +        implicitly[Encoder[U]].schema.toAttributes,
    +        logicalPlan))
    +  }
    +
    +  def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] =
    +    mapPartitions(_.flatMap(func))
    +
    +  /* ************** *
    +   *  Side effects  *
    +   * ************** */
    +
    +  /**
    +   * Runs `func` on each element of this Dataset.
    +   * @since 1.6.0
    +   */
    +  def foreach(func: T => Unit): Unit = rdd.foreach(func)
    +
    +  /**
    +   * Runs `func` on each partition of this Dataset.
    +   * @since 1.6.0
    +   */
    +  def foreachPartition(func: Iterator[T] => Unit): Unit = rdd.foreachPartition(func)
    +
    +  /* ************* *
    +   *  Aggregation  *
    +   * ************* */
    +
    +  /**
    +   * Reduces the elements of this Dataset using the specified  binary function.  The given function
    +   * must be commutative and associative or the result may be non-deterministic.
    +   * @since 1.6.0
    +   */
    +  def reduce(func: (T, T) => T): T = rdd.reduce(func)
    +
    +  /**
    +   * Aggregates the elements of each partition, and then the results for all the partitions, using a
    +   * given associative and commutative function and a neutral "zero value".
    +   *
    +   * This behaves somewhat differently than the fold operations implemented for non-distributed
    +   * collections in functional languages like Scala. This fold operation may be applied to
    +   * partitions individually, and then those results will be folded into the final result.
    +   * If op is not commutative, then the result may differ from that of a fold applied to a
    +   * non-distributed collection.
    +   * @since 1.6.0
    +   */
    +  def fold(zeroValue: T)(op: (T, T) => T): T = rdd.fold(zeroValue)(op)
    +
    +  /**
    +   * Returns a [[GroupedDataset]] where the data is grouped by the given key function.
    +   * @since 1.6.0
    +   */
    +  def groupBy[K : Encoder](func: T => K): GroupedDataset[K, T] = {
    +    val inputPlan = queryExecution.analyzed
    +    val withGroupingKey = AppendColumn(func, inputPlan)
    +    val executed = sqlContext.executePlan(withGroupingKey)
    +
    +    new GroupedDataset(
    +      implicitly[Encoder[K]].bindOrdinals(withGroupingKey.newColumns),
    +      implicitly[Encoder[T]].bind(inputPlan.output),
    +      executed,
    +      inputPlan.output,
    +      withGroupingKey.newColumns)
    +  }
    +
    +  /* ****************** *
    +   *  Typed Relational  *
    +   * ****************** */
    +
    +  /**
    +   * Returns a new [[Dataset]] by computing the given [[Column]] expression for each element.
    +   *
    +   * {{{
    +   *   val ds = Seq(1, 2, 3).toDS()
    +   *   val newDS = ds.select(e[Int]("value + 1"))
    +   * }}}
    +   * @since 1.6.0
    +   */
    +  def select[U1: Encoder](c1: TypedColumn[U1]): Dataset[U1] = {
    +    new Dataset[U1](sqlContext, Project(Alias(c1.expr, "_1")() :: Nil, logicalPlan))
    +  }
    +
    +  // Codegen
    +  // scalastyle:off
    --- End diff --
    
    The next 5 functions are code generated for variadic types.  It didn't seem worth making the codegen obey the 100 char limitation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42702889
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala ---
    @@ -31,6 +31,7 @@ import org.apache.spark.sql.types.StructType
      * and reuse internal buffers to improve performance.
      */
     trait Encoder[T] {
    --- End diff --
    
    Is this a DeveloperApi for now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/9190#issuecomment-150371978
  
    I'm going to merge this. We should continue to do review post-hoc.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42776824
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala ---
    @@ -0,0 +1,68 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.encoders.Encoder
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.QueryExecution
    +
    +/**
    + * A [[Dataset]] has been logically grouped by a user specified grouping key.  Users should not
    + * construct a [[GroupedDataset]] directly, but should instead call `groupBy` on an existing
    + * [[Dataset]].
    + */
    +class GroupedDataset[K, T] private[sql](
    +    private val kEncoder: Encoder[K],
    +    private val tEncoder: Encoder[T],
    +    queryExecution: QueryExecution,
    +    private val dataAttributes: Seq[Attribute],
    +    private val groupingAttributes: Seq[Attribute]) extends Serializable {
    +
    +  private implicit def kEnc = kEncoder
    +  private implicit def tEnc = tEncoder
    +  private def logicalPlan = queryExecution.analyzed
    +  private def sqlContext = queryExecution.sqlContext
    +
    +  /**
    +   * Returns a [[Dataset]] that contains each unique key.
    +   */
    +  def keys: Dataset[K] = {
    --- End diff --
    
    We don't *need* it, but its virtually free to implement efficiently, additionally:
    - [Scalding has it](https://github.com/twitter/scalding/wiki/Type-safe-api-reference#creating-groups-and-joining-cogrouping)
    - [PairRDD has it](https://spark.apache.org/docs/0.7.2/api/core/spark/PairRDDFunctions.html)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11116] [SQL] First Draft of Dataset API

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42970368
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/tuples.scala ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.encoders
    +
    +
    +import scala.reflect.ClassTag
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.types.{StructField, StructType}
    +
    +// Most of this file is codegen.
    +// scalastyle:off
    +
    +/**
    + * A set of composite encoders that take sub encoders and map each of their objects to a
    + * Scala tuple.  Note that currently the implementation is fairly limited and only supports going
    + * from an internal row to a tuple.
    + */
    +object TupleEncoder {
    --- End diff --
    
    Thats true, but the though was that we wanted some way to compose any encoder (i.e. one for Java POJOs) in a way that returns a tuple.  However, I'm in the process of rewriting all of this to do it using expression composition instead of imperative code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org