You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by AndreSchumacher <gi...@git.apache.org> on 2014/04/23 20:10:04 UTC

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

GitHub user AndreSchumacher opened a pull request:

    https://github.com/apache/spark/pull/511

    SPARK-1487 [SQL] Support record filtering via predicate pushdown in Parquet

    Simple filter predicates such as LessThan, GreaterThan, etc., where one side is a literal and the other one a NamedExpression are now pushed down to the underlying ParquetTableScan. Here are some results for a microbenchmark with a simple schema of six fields of different types where most records failed the test:
    
                 | Uncompressed  | Compressed
    -------------| ------------- | -------------
    File size  |     10 GB  | 2 GB
    Speedup |      2         | 1.8
    
    Since mileage may vary I added a new option to SparkConf:
    
    `org.apache.spark.sql.parquet.filter.pushdown`
    
    Default value would be `true` and setting it to `false` disables the pushdown. When most rows are expected to pass the filter or when there are few fields performance can be better when pushdown is disabled. The default should fit situations with a reasonable number of (possibly nested) fields where not too many records on average pass the filter.
    
    Because of an issue with Parquet ([see here](https://github.com/Parquet/parquet-mr/issues/371])) currently only predicates on non-nullable attributes are pushed down. If one would know that for a given table no optional fields have missing values one could also allow overriding this.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/AndreSchumacher/spark parquet_filter

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/511.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #511
    
----
commit 25177c0b3625ae9cbae931f40974743e48e2dbc7
Author: Andre Schumacher <an...@iki.fi>
Date:   2014-04-21T15:04:07Z

    First commit Parquet record filtering

commit 26d9b62f63329151b9f9203bfb1dd26604cd11ba
Author: Andre Schumacher <an...@iki.fi>
Date:   2014-04-22T14:15:49Z

    Extending ParquetFilters

commit 314b20bbd9def8273910e64eda3f216681b7989c
Author: Andre Schumacher <an...@iki.fi>
Date:   2014-04-22T15:16:34Z

    Adding unit test for filtering

commit 6e9d03f20fe1c1cf98bb11ed40c9f1f89965f955
Author: Andre Schumacher <an...@iki.fi>
Date:   2014-04-22T17:13:00Z

    Adding disjunctive filter predicates

commit 4fbf636a9433f8717e0d6c5181e4937cb90526e7
Author: Andre Schumacher <an...@iki.fi>
Date:   2014-04-22T17:22:27Z

    Undoing changes not needed for this PR

commit 38a9453ac62c641eecc090c7f3a9ef30a2dfd596
Author: Andre Schumacher <an...@iki.fi>
Date:   2014-04-23T14:45:03Z

    Adding SparkConf setting to disable filter predicate pushdown

commit 15f0339cf056e4c814a3145c79b37da629f4235a
Author: Andre Schumacher <an...@iki.fi>
Date:   2014-04-23T17:50:23Z

    Optimizing imports in ParquetTestData

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AndreSchumacher <gi...@git.apache.org>.
Github user AndreSchumacher commented on a diff in the pull request:

    https://github.com/apache/spark/pull/511#discussion_r12022986
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala ---
    @@ -0,0 +1,413 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.parquet
    +
    +import org.apache.hadoop.conf.Configuration
    +
    +import parquet.filter._
    +import parquet.filter.ColumnPredicates._
    +import parquet.column.ColumnReader
    +
    +import com.google.common.io.BaseEncoding
    +
    +import org.apache.spark.sql.catalyst.types._
    +import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate}
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.execution.SparkSqlSerializer
    +
    +object ParquetFilters {
    +  val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
    +  // set this to false if pushdown should be disabled
    +  val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.hints.parquetFilterPushdown"
    +
    +  def createRecordFilter(filterExpressions: Seq[Expression]): UnboundRecordFilter = {
    +    val filters: Seq[CatalystFilter] = filterExpressions.collect {
    +      case (expression: Expression) if createFilter(expression).isDefined =>
    +        createFilter(expression).get
    +    }
    +    if (filters.length > 0) filters.reduce(AndRecordFilter.and) else null
    +  }
    +
    +  def createFilter(expression: Expression): Option[CatalystFilter] = {
    +    def createEqualityFilter(
    +        name: String,
    +        literal: Literal,
    +        predicate: CatalystPredicate) = literal.dataType match {
    +      case BooleanType =>
    +        ComparisonFilter.createBooleanFilter(name, literal.value.asInstanceOf[Boolean], predicate)
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(
    +          name,
    +          (x: Int) => x == literal.value.asInstanceOf[Int],
    +          predicate)
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(
    +          name,
    +          (x: Long) => x == literal.value.asInstanceOf[Long],
    +          predicate)
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x == literal.value.asInstanceOf[Double],
    +          predicate)
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x == literal.value.asInstanceOf[Float],
    +          predicate)
    +      case StringType =>
    +        ComparisonFilter.createStringFilter(name, literal.value.asInstanceOf[String], predicate)
    +    }
    +    def createLessThanFilter(
    +        name: String,
    +        literal: Literal,
    +        predicate: CatalystPredicate) = literal.dataType match {
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(
    +          name,
    +          (x: Int) => x < literal.value.asInstanceOf[Int],
    +          predicate)
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(
    +          name,
    +          (x: Long) => x < literal.value.asInstanceOf[Long],
    +          predicate)
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x < literal.value.asInstanceOf[Double],
    +          predicate)
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x < literal.value.asInstanceOf[Float],
    +          predicate)
    +    }
    +    def createLessThanOrEqualFilter(
    +        name: String,
    +        literal: Literal,
    +        predicate: CatalystPredicate) = literal.dataType match {
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(
    +          name,
    +          (x: Int) => x <= literal.value.asInstanceOf[Int],
    +          predicate)
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(
    +          name,
    +          (x: Long) => x <= literal.value.asInstanceOf[Long],
    +          predicate)
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x <= literal.value.asInstanceOf[Double],
    +          predicate)
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x <= literal.value.asInstanceOf[Float],
    +          predicate)
    +    }
    +    // TODO: combine these two types somehow?
    +    def createGreaterThanFilter(
    +        name: String,
    +        literal: Literal,
    +        predicate: CatalystPredicate) = literal.dataType match {
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(
    +          name,
    +          (x: Int) => x > literal.value.asInstanceOf[Int],
    +          predicate)
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(
    +          name,
    +          (x: Long) => x > literal.value.asInstanceOf[Long],
    +          predicate)
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x > literal.value.asInstanceOf[Double],
    +          predicate)
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x > literal.value.asInstanceOf[Float],
    +          predicate)
    +    }
    +    def createGreaterThanOrEqualFilter(
    +        name: String,
    +        literal: Literal,
    +        predicate: CatalystPredicate) = literal.dataType match {
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(
    +          name, (x: Int) => x >= literal.value.asInstanceOf[Int],
    +          predicate)
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(
    +          name,
    +          (x: Long) => x >= literal.value.asInstanceOf[Long],
    +          predicate)
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x >= literal.value.asInstanceOf[Double],
    +          predicate)
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x >= literal.value.asInstanceOf[Float],
    +          predicate)
    +    }
    +    // TODO: we currently only filter on non-nullable (Parquet REQUIRED) attributes until
    +    // https://github.com/Parquet/parquet-mr/issues/371
    +    // has been resolved
    +    expression match {
    +      case p @ Or(left: Expression, right: Expression)
    +          if createFilter(left).isDefined && createFilter(right).isDefined => {
    +        // If either side of this Or-predicate is empty then this means
    +        // it contains a more complex comparison than between attribute and literal
    +        // (e.g., it contained a CAST). The only safe thing to do is then to disregard
    +        // this disjunction, which could be contained in a conjunction. If it stands
    +        // alone then it is also safe to drop it, since a Null return value of this
    +        // function is interpreted as having no filters at all.
    +        val leftFilter = createFilter(left).get
    +        val rightFilter = createFilter(right).get
    +        Some(new OrFilter(leftFilter, rightFilter))
    +      }
    +      case p @ And(left: Expression, right: Expression) => {
    +        // This treats nested conjunctions; since either side of the conjunction
    +        // may contain more complex filter expressions we may actually generate
    +        // strictly weaker filter predicates in the process.
    +        val leftFilter = createFilter(left)
    +        val rightFilter = createFilter(right)
    +        (leftFilter, rightFilter) match {
    +          case (None, Some(filter)) => Some(filter)
    +          case (Some(filter), None) => Some(filter)
    +          case (_, _) =>
    +            Some(new AndFilter(leftFilter.get, rightFilter.get))
    +        }
    +      }
    +      case p @ Equals(left: Literal, right: NamedExpression) if !right.nullable =>
    +        Some(createEqualityFilter(right.name, left, p))
    +      case p @ Equals(left: NamedExpression, right: Literal) if !left.nullable =>
    +        Some(createEqualityFilter(left.name, right, p))
    +      case p @ LessThan(left: Literal, right: NamedExpression) if !right.nullable =>
    +        Some(createLessThanFilter(right.name, left, p))
    +      case p @ LessThan(left: NamedExpression, right: Literal) if !left.nullable =>
    +        Some(createLessThanFilter(left.name, right, p))
    +      case p @ LessThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
    +        Some(createLessThanOrEqualFilter(right.name, left, p))
    +      case p @ LessThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
    +        Some(createLessThanOrEqualFilter(left.name, right, p))
    +      case p @ GreaterThan(left: Literal, right: NamedExpression) if !right.nullable =>
    +        Some(createGreaterThanFilter(right.name, left, p))
    +      case p @ GreaterThan(left: NamedExpression, right: Literal) if !left.nullable =>
    +        Some(createGreaterThanFilter(left.name, right, p))
    +      case p @ GreaterThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
    +        Some(createGreaterThanOrEqualFilter(right.name, left, p))
    +      case p @ GreaterThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
    +        Some(createGreaterThanOrEqualFilter(left.name, right, p))
    +      case _ => None
    +    }
    +  }
    +  // Note: Inside the Hadoop API we only have access to `Configuration`, not to
    +  // [[SparkContext]], so we cannot use broadcasts to convey the actual filter
    +  // predicate.
    +  def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = {
    +    val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters)
    +    val encoded: String = BaseEncoding.base64().encode(serialized)
    +    conf.set(PARQUET_FILTER_DATA, encoded)
    +  }
    +
    +  // Note: Inside the Hadoop API we only have access to `Configuration`, not to
    +  // [[SparkContext]], so we cannot use broadcasts to convey the actual filter
    +  // predicate.
    +  def deserializeFilterExpressions(conf: Configuration): Option[Seq[Expression]] = {
    +    val data = conf.get(PARQUET_FILTER_DATA)
    +    if (data != null) {
    +      val decoded: Array[Byte] = BaseEncoding.base64().decode(data)
    +      Some(SparkSqlSerializer.deserialize(decoded))
    +    } else {
    +      None
    +    }
    +  }
    +
    +  // Try to find the given expression in the tree of filters in order to
    --- End diff --
    
    Changed it in a few places now and will try to remember next time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41195362
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41460383
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41364498
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AndreSchumacher <gi...@git.apache.org>.
Github user AndreSchumacher commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41363813
  
    @marmbrus @mateiz Thanks a lot for the comments and the fast response.
    
    About the config setting: I would feel more comfortable setting a default after there has been some experience with realistic workloads and schemas. But I renamed it now, as suggested by Matei.
    
    The bigger changes in my last commit are now to keep track of what is actually pushed and why. Then the predicates which are "completely" pushed are removed inside the Planner. Note that attempting to push "A & B" can result only in "A" being pushed because B contains anything other than a simple comparison of a column value. In this case "A & B" should be kept for now (IMHO). There is still in advantage in pushing A since hopefully there are fewer records that pass the filter to the higher level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/511#discussion_r12010474
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala ---
    @@ -46,7 +47,7 @@ case class ParquetTableScan(
         // https://issues.apache.org/jira/browse/SPARK-1367
         output: Seq[Attribute],
         relation: ParquetRelation,
    -    columnPruningPred: Option[Expression])(
    +    columnPruningPred: Option[Seq[Expression]])(
    --- End diff --
    
    Why have an `Option[Seq[...]]` when option is just a special `Seq` that has 0 or 1 elements?  Can we just model the `None` case with an empty Seq?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/511#discussion_r11930738
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala ---
    @@ -195,5 +196,27 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
         Utils.deleteRecursively(ParquetTestData.testDir)
         ParquetTestData.writeFile()
       }
    +
    +  test("test filter by predicate pushdown") {
    +    for(myval <- Seq("myint", "mylong", "mydouble", "myfloat")) {
    +      println(s"testing field $myval")
    +      val result1 = sql(s"SELECT * FROM testfiltersource WHERE $myval < 150 AND $myval >= 100").collect()
    --- End diff --
    
    It would be good to dive into the `queryExecution` here and somehow check that predicates are actually getting pushed down.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41369689
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14481/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-43359201
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-42739114
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14868/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-43259522
  
    @AndreSchumacher this is no longer merge-able. Do you mind bring it up to date so it can be merged cleanly? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/511#discussion_r11930760
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala ---
    @@ -0,0 +1,252 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.parquet
    +
    +import org.apache.hadoop.conf.Configuration
    +
    +import parquet.filter._
    +import parquet.filter.ColumnPredicates._
    +import parquet.column.ColumnReader
    +
    +import com.google.common.io.BaseEncoding
    +
    +import org.apache.spark.sql.catalyst.types._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.execution.SparkSqlSerializer
    +
    +object ParquetFilters {
    +  val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
    +  // set this to false if pushdown should be disabled
    +  // Note: prefix is "spark.hadoop." so that it will be copied from SparkConf
    +  // to Hadoop configuration
    +  val PARQUET_FILTER_PUSHDOWN_ENABLED = "org.apache.spark.sql.parquet.filter.pushdown"
    +
    +  def createFilter(filterExpressions: Seq[Expression]): UnboundRecordFilter = {
    +    def createEqualityFilter(name: String, literal: Literal) = literal.dataType match {
    +      case BooleanType =>
    +        ComparisonFilter.createBooleanFilter(name, literal.value.asInstanceOf[Boolean])
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(name, (x: Int) => x == literal.value.asInstanceOf[Int])
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(name, (x: Long) => x == literal.value.asInstanceOf[Long])
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x == literal.value.asInstanceOf[Double])
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x == literal.value.asInstanceOf[Float])
    +      case StringType =>
    +        ComparisonFilter.createStringFilter(name, literal.value.asInstanceOf[String])
    +    }
    +    def createLessThanFilter(name: String, literal: Literal) = literal.dataType match {
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(name, (x: Int) => x < literal.value.asInstanceOf[Int])
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(name, (x: Long) => x < literal.value.asInstanceOf[Long])
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x < literal.value.asInstanceOf[Double])
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x < literal.value.asInstanceOf[Float])
    +    }
    +    def createLessThanOrEqualFilter(name: String, literal: Literal) = literal.dataType match {
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(name, (x: Int) => x <= literal.value.asInstanceOf[Int])
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(name, (x: Long) => x <= literal.value.asInstanceOf[Long])
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x <= literal.value.asInstanceOf[Double])
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x <= literal.value.asInstanceOf[Float])
    +    }
    +    // TODO: combine these two types somehow?
    +    def createGreaterThanFilter(name: String, literal: Literal) = literal.dataType match {
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(name, (x: Int) => x > literal.value.asInstanceOf[Int])
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(name, (x: Long) => x > literal.value.asInstanceOf[Long])
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x > literal.value.asInstanceOf[Double])
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x > literal.value.asInstanceOf[Float])
    +    }
    +    def createGreaterThanOrEqualFilter(name: String, literal: Literal) = literal.dataType match {
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(name, (x: Int) => x >= literal.value.asInstanceOf[Int])
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(name, (x: Long) => x >= literal.value.asInstanceOf[Long])
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x >= literal.value.asInstanceOf[Double])
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x >= literal.value.asInstanceOf[Float])
    +    }
    +    // TODO: can we actually rely on the predicate being normalized as in expression < literal?
    +    // That would simplify this pattern matching
    +    // TODO: we currently only filter on non-nullable (Parquet REQUIRED) attributes until
    +    // https://github.com/Parquet/parquet-mr/issues/371
    +    // has been resolved
    +    val filters: Seq[UnboundRecordFilter] = filterExpressions.collect {
    +      case Or(left: Expression, right: Expression)
    +          if createFilter(Seq(left)) != null && createFilter(Seq(right)) != null => {
    +        // Note: if either side of this Or-predicate is empty then this means
    +        // it contains a more complex comparison than between attribute and literal
    +        // (e.g., it contained a CAST). The only safe thing to do is then to disregard
    +        // this disjunction, which could be contained in a conjunction. If it stands
    +        // alone then it is also safe to drop it, since a Null return value of this
    +        // function is interpreted as having no filters at all.
    +        val leftFilter = createFilter(Seq(left))
    +        val rightFilter = createFilter(Seq(right))
    +        OrRecordFilter.or(leftFilter, rightFilter)
    +      }
    +      case Equals(left: Literal, right: NamedExpression) if !right.nullable =>
    +        createEqualityFilter(right.name, left)
    +      case Equals(left: NamedExpression, right: Literal) if !left.nullable =>
    +        createEqualityFilter(left.name, right)
    +      case LessThan(left: Literal, right: NamedExpression) if !right.nullable =>
    +        createLessThanFilter(right.name, left)
    +      case LessThan(left: NamedExpression, right: Literal) if !left.nullable =>
    +        createLessThanFilter(left.name, right)
    +      case LessThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
    +        createLessThanOrEqualFilter(right.name, left)
    +      case LessThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
    +        createLessThanOrEqualFilter(left.name, right)
    +      case GreaterThan(left: Literal, right: NamedExpression) if !right.nullable =>
    +        createGreaterThanFilter(right.name, left)
    +      case GreaterThan(left: NamedExpression, right: Literal) if !left.nullable =>
    +        createGreaterThanFilter(left.name, right)
    +      case GreaterThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
    +        createGreaterThanOrEqualFilter(right.name, left)
    +      case GreaterThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
    +        createGreaterThanOrEqualFilter(left.name, right)
    +    }
    +    if (filters.length > 0) filters.reduce(AndRecordFilter.and) else null
    +  }
    +
    +  // Note: Inside the Hadoop API we only have access to `Configuration`, not to
    +  // [[SparkContext]], so we cannot use broadcasts to convey the actual filter
    +  // predicate.
    +  def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = {
    +    val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters)
    +    val encoded: String = BaseEncoding.base64().encode(serialized)
    +    conf.set(PARQUET_FILTER_DATA, encoded)
    +  }
    +
    +  // Note: Inside the Hadoop API we only have access to `Configuration`, not to
    +  // [[SparkContext]], so we cannot use broadcasts to convey the actual filter
    +  // predicate.
    +  def deserializeFilterExpressions(conf: Configuration): Option[Seq[Expression]] = {
    +    val data = conf.get(PARQUET_FILTER_DATA)
    +    if (data != null) {
    +      val decoded: Array[Byte] = BaseEncoding.base64().decode(data)
    +      Some(SparkSqlSerializer.deserialize(decoded))
    +    } else {
    +      None
    +    }
    +  }
    +}
    +
    +class ComparisonFilter(
    +    private val columnName: String,
    +    private var filter: UnboundRecordFilter)
    +  extends UnboundRecordFilter {
    +  override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = {
    +    filter.bind(readers)
    +  }
    +}
    +
    +object ComparisonFilter {
    +  def createBooleanFilter(columnName: String, value: Boolean): UnboundRecordFilter =
    +    new ComparisonFilter(
    +      columnName,
    +      ColumnRecordFilter.column(
    +        columnName,
    +        ColumnPredicates.applyFunctionToBoolean(
    +          new BooleanPredicateFunction {
    +            def functionToApply(input: Boolean): Boolean = input == value
    +          }
    +        )))
    +  def createStringFilter(columnName: String, value: String): UnboundRecordFilter =
    +    new ComparisonFilter(
    +      columnName,
    +      ColumnRecordFilter.column(
    +        columnName,
    +        ColumnPredicates.applyFunctionToString (
    +          new ColumnPredicates.PredicateFunction[String]  {
    +            def functionToApply(input: String): Boolean = input == value
    +          }
    +        )))
    +  def createIntFilter(columnName: String, func: Int => Boolean): UnboundRecordFilter =
    +    new ComparisonFilter(
    +      columnName,
    +      ColumnRecordFilter.column(
    +        columnName,
    +        ColumnPredicates.applyFunctionToInteger(
    +          new IntegerPredicateFunction {
    +            def functionToApply(input: Int) = func(input)
    +          }
    +        )))
    +  def createLongFilter(columnName: String, func: Long => Boolean): UnboundRecordFilter =
    +    new ComparisonFilter(
    +      columnName,
    +      ColumnRecordFilter.column(
    +        columnName,
    +        ColumnPredicates.applyFunctionToLong(
    +          new LongPredicateFunction {
    +            def functionToApply(input: Long) = func(input)
    +          }
    +        )))
    +  def createDoubleFilter(columnName: String, func: Double => Boolean): UnboundRecordFilter =
    +    new ComparisonFilter(
    +      columnName,
    +      ColumnRecordFilter.column(
    +        columnName,
    +        ColumnPredicates.applyFunctionToDouble(
    +          new DoublePredicateFunction {
    +            def functionToApply(input: Double) = func(input)
    +          }
    +        )))
    +  def createFloatFilter(columnName: String, func: Float => Boolean): UnboundRecordFilter =
    +    new ComparisonFilter(
    +      columnName,
    +      ColumnRecordFilter.column(
    +        columnName,
    +        ColumnPredicates.applyFunctionToFloat(
    +          new FloatPredicateFunction {
    +            def functionToApply(input: Float) = func(input)
    +          }
    +        )))
    +}
    +
    --- End diff --
    
    Extra spaces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AndreSchumacher <gi...@git.apache.org>.
Github user AndreSchumacher commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-43359217
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41205566
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14381/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/511#discussion_r12010532
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala ---
    @@ -0,0 +1,413 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.parquet
    +
    +import org.apache.hadoop.conf.Configuration
    +
    +import parquet.filter._
    +import parquet.filter.ColumnPredicates._
    +import parquet.column.ColumnReader
    +
    +import com.google.common.io.BaseEncoding
    +
    +import org.apache.spark.sql.catalyst.types._
    +import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate}
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.execution.SparkSqlSerializer
    +
    +object ParquetFilters {
    +  val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
    +  // set this to false if pushdown should be disabled
    +  val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.hints.parquetFilterPushdown"
    +
    +  def createRecordFilter(filterExpressions: Seq[Expression]): UnboundRecordFilter = {
    +    val filters: Seq[CatalystFilter] = filterExpressions.collect {
    +      case (expression: Expression) if createFilter(expression).isDefined =>
    +        createFilter(expression).get
    +    }
    +    if (filters.length > 0) filters.reduce(AndRecordFilter.and) else null
    +  }
    +
    +  def createFilter(expression: Expression): Option[CatalystFilter] = {
    +    def createEqualityFilter(
    +        name: String,
    +        literal: Literal,
    +        predicate: CatalystPredicate) = literal.dataType match {
    +      case BooleanType =>
    +        ComparisonFilter.createBooleanFilter(name, literal.value.asInstanceOf[Boolean], predicate)
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(
    +          name,
    +          (x: Int) => x == literal.value.asInstanceOf[Int],
    +          predicate)
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(
    +          name,
    +          (x: Long) => x == literal.value.asInstanceOf[Long],
    +          predicate)
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x == literal.value.asInstanceOf[Double],
    +          predicate)
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x == literal.value.asInstanceOf[Float],
    +          predicate)
    +      case StringType =>
    +        ComparisonFilter.createStringFilter(name, literal.value.asInstanceOf[String], predicate)
    +    }
    +    def createLessThanFilter(
    +        name: String,
    +        literal: Literal,
    +        predicate: CatalystPredicate) = literal.dataType match {
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(
    +          name,
    +          (x: Int) => x < literal.value.asInstanceOf[Int],
    +          predicate)
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(
    +          name,
    +          (x: Long) => x < literal.value.asInstanceOf[Long],
    +          predicate)
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x < literal.value.asInstanceOf[Double],
    +          predicate)
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x < literal.value.asInstanceOf[Float],
    +          predicate)
    +    }
    +    def createLessThanOrEqualFilter(
    +        name: String,
    +        literal: Literal,
    +        predicate: CatalystPredicate) = literal.dataType match {
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(
    +          name,
    +          (x: Int) => x <= literal.value.asInstanceOf[Int],
    +          predicate)
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(
    +          name,
    +          (x: Long) => x <= literal.value.asInstanceOf[Long],
    +          predicate)
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x <= literal.value.asInstanceOf[Double],
    +          predicate)
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x <= literal.value.asInstanceOf[Float],
    +          predicate)
    +    }
    +    // TODO: combine these two types somehow?
    +    def createGreaterThanFilter(
    +        name: String,
    +        literal: Literal,
    +        predicate: CatalystPredicate) = literal.dataType match {
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(
    +          name,
    +          (x: Int) => x > literal.value.asInstanceOf[Int],
    +          predicate)
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(
    +          name,
    +          (x: Long) => x > literal.value.asInstanceOf[Long],
    +          predicate)
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x > literal.value.asInstanceOf[Double],
    +          predicate)
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x > literal.value.asInstanceOf[Float],
    +          predicate)
    +    }
    +    def createGreaterThanOrEqualFilter(
    +        name: String,
    +        literal: Literal,
    +        predicate: CatalystPredicate) = literal.dataType match {
    +      case IntegerType =>
    +        ComparisonFilter.createIntFilter(
    +          name, (x: Int) => x >= literal.value.asInstanceOf[Int],
    +          predicate)
    +      case LongType =>
    +        ComparisonFilter.createLongFilter(
    +          name,
    +          (x: Long) => x >= literal.value.asInstanceOf[Long],
    +          predicate)
    +      case DoubleType =>
    +        ComparisonFilter.createDoubleFilter(
    +          name,
    +          (x: Double) => x >= literal.value.asInstanceOf[Double],
    +          predicate)
    +      case FloatType =>
    +        ComparisonFilter.createFloatFilter(
    +          name,
    +          (x: Float) => x >= literal.value.asInstanceOf[Float],
    +          predicate)
    +    }
    +    // TODO: we currently only filter on non-nullable (Parquet REQUIRED) attributes until
    +    // https://github.com/Parquet/parquet-mr/issues/371
    +    // has been resolved
    +    expression match {
    +      case p @ Or(left: Expression, right: Expression)
    +          if createFilter(left).isDefined && createFilter(right).isDefined => {
    +        // If either side of this Or-predicate is empty then this means
    +        // it contains a more complex comparison than between attribute and literal
    +        // (e.g., it contained a CAST). The only safe thing to do is then to disregard
    +        // this disjunction, which could be contained in a conjunction. If it stands
    +        // alone then it is also safe to drop it, since a Null return value of this
    +        // function is interpreted as having no filters at all.
    +        val leftFilter = createFilter(left).get
    +        val rightFilter = createFilter(right).get
    +        Some(new OrFilter(leftFilter, rightFilter))
    +      }
    +      case p @ And(left: Expression, right: Expression) => {
    +        // This treats nested conjunctions; since either side of the conjunction
    +        // may contain more complex filter expressions we may actually generate
    +        // strictly weaker filter predicates in the process.
    +        val leftFilter = createFilter(left)
    +        val rightFilter = createFilter(right)
    +        (leftFilter, rightFilter) match {
    +          case (None, Some(filter)) => Some(filter)
    +          case (Some(filter), None) => Some(filter)
    +          case (_, _) =>
    +            Some(new AndFilter(leftFilter.get, rightFilter.get))
    +        }
    +      }
    +      case p @ Equals(left: Literal, right: NamedExpression) if !right.nullable =>
    +        Some(createEqualityFilter(right.name, left, p))
    +      case p @ Equals(left: NamedExpression, right: Literal) if !left.nullable =>
    +        Some(createEqualityFilter(left.name, right, p))
    +      case p @ LessThan(left: Literal, right: NamedExpression) if !right.nullable =>
    +        Some(createLessThanFilter(right.name, left, p))
    +      case p @ LessThan(left: NamedExpression, right: Literal) if !left.nullable =>
    +        Some(createLessThanFilter(left.name, right, p))
    +      case p @ LessThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
    +        Some(createLessThanOrEqualFilter(right.name, left, p))
    +      case p @ LessThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
    +        Some(createLessThanOrEqualFilter(left.name, right, p))
    +      case p @ GreaterThan(left: Literal, right: NamedExpression) if !right.nullable =>
    +        Some(createGreaterThanFilter(right.name, left, p))
    +      case p @ GreaterThan(left: NamedExpression, right: Literal) if !left.nullable =>
    +        Some(createGreaterThanFilter(left.name, right, p))
    +      case p @ GreaterThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
    +        Some(createGreaterThanOrEqualFilter(right.name, left, p))
    +      case p @ GreaterThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
    +        Some(createGreaterThanOrEqualFilter(left.name, right, p))
    +      case _ => None
    +    }
    +  }
    +  // Note: Inside the Hadoop API we only have access to `Configuration`, not to
    +  // [[SparkContext]], so we cannot use broadcasts to convey the actual filter
    +  // predicate.
    +  def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = {
    +    val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters)
    +    val encoded: String = BaseEncoding.base64().encode(serialized)
    +    conf.set(PARQUET_FILTER_DATA, encoded)
    +  }
    +
    +  // Note: Inside the Hadoop API we only have access to `Configuration`, not to
    +  // [[SparkContext]], so we cannot use broadcasts to convey the actual filter
    +  // predicate.
    +  def deserializeFilterExpressions(conf: Configuration): Option[Seq[Expression]] = {
    +    val data = conf.get(PARQUET_FILTER_DATA)
    +    if (data != null) {
    +      val decoded: Array[Byte] = BaseEncoding.base64().decode(data)
    +      Some(SparkSqlSerializer.deserialize(decoded))
    +    } else {
    +      None
    +    }
    +  }
    +
    +  // Try to find the given expression in the tree of filters in order to
    --- End diff --
    
    Class and method docs should probably be written as scala doc in case we ever build developer docs that include private classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AndreSchumacher <gi...@git.apache.org>.
Github user AndreSchumacher commented on a diff in the pull request:

    https://github.com/apache/spark/pull/511#discussion_r12022983
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -175,12 +175,35 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
             InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
           case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
             InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
    -      case PhysicalOperation(projectList, filters, relation: ParquetRelation) =>
    -        // TODO: Should be pushing down filters as well.
    +      case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => {
    +        val remainingFilters =
    +          if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
    +            filters.filter {
    +              // Note: filters cannot be pushed down to Parquet if they contain more complex
    +              // expressions than simple "Attribute cmp Literal" comparisons. Here we remove
    +              // all filters that have been pushed down. Note that a predicate such as
    +              // "A AND B" can result in "A" being pushed down.
    --- End diff --
    
    Good point, bad example. That's why I initially didn't treat ANDs at all when creating the filters from the expressions. But then I thought one could have expressions such as (A AND B) OR C which should probably be treated in the planner I guess and turned into (A OR C) AND (B OR C) but currently are not. Please correct me if I am wrong. It may be that the parser doesn't currently allow these kind of filter expressions with '(', ')' though although nothing speaks against them I guess.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41195351
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41460349
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/511#discussion_r11930428
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -176,11 +176,15 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
             InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
           case PhysicalOperation(projectList, filters, relation: ParquetRelation) =>
    -        // TODO: Should be pushing down filters as well.
    -        pruneFilterProject(
    -          projectList,
    -          filters,
    -          ParquetTableScan(_, relation, None)(sparkContext)) :: Nil
    +        // Note: we do not actually remove the filters that were pushed down to Parquet from
    +        // the plan, in case that some of the predicates cannot be evaluated there because
    +        // they contain complex operations, such as CASTs.
    +        // TODO: rethink whether conjuntions that are handed down to Parquet should be removed
    --- End diff --
    
    How hard would it be to move the logic that determines what filters can be pushed down here, into the planner, so that we can avoid the double evaluation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AndreSchumacher <gi...@git.apache.org>.
Github user AndreSchumacher commented on a diff in the pull request:

    https://github.com/apache/spark/pull/511#discussion_r12022985
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala ---
    @@ -46,7 +47,7 @@ case class ParquetTableScan(
         // https://issues.apache.org/jira/browse/SPARK-1367
         output: Seq[Attribute],
         relation: ParquetRelation,
    -    columnPruningPred: Option[Expression])(
    +    columnPruningPred: Option[Seq[Expression]])(
    --- End diff --
    
    Thanks, excellent point. I changed it as suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AndreSchumacher <gi...@git.apache.org>.
Github user AndreSchumacher commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-42748583
  
    @marmbrus I guess this is ready to be merged to master? (I saw there is a separate 1.0 branch now)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/511#discussion_r11930769
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala ---
    @@ -27,15 +27,17 @@ import org.apache.hadoop.mapreduce._
     import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
     import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat, FileOutputCommitter}
     
    -import parquet.hadoop.{ParquetInputFormat, ParquetOutputFormat}
    +import parquet.hadoop.{ParquetRecordReader, BadConfigurationException, ParquetInputFormat, ParquetOutputFormat}
     import parquet.hadoop.util.ContextUtil
     import parquet.io.InvalidRecordException
     import parquet.schema.MessageType
     
    -import org.apache.spark.{SerializableWritable, SparkContext, TaskContext}
    +import org.apache.spark.{Logging, SerializableWritable, SparkContext, TaskContext}
     import org.apache.spark.rdd.RDD
    -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
    +import org.apache.spark.sql.catalyst.expressions.{BinaryComparison, Attribute, Expression, Row}
     import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
    +import parquet.filter.UnboundRecordFilter
    --- End diff --
    
    put these up with the other parquet things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AndreSchumacher <gi...@git.apache.org>.
Github user AndreSchumacher commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41195167
  
    @marmbrus would be great if you could have a look when you have some time. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41370693
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-42893846
  
    Yeah, I think this is ready.  @pwendell, can you merge this into master?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-42739113
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41369687
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-42737714
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41462847
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14510/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-43359730
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41356635
  
    @mateiz good point.  I agree with you long-term this decision should be up to the optimizer.  However, in this case I think the right thing is probably to create a section of the sql config called `hints`.  We don't promise to obey or support hints long term, but they can be used for immediate optimizations. This seems like the typical DB way to offset the cases where the optimization is currently poor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AndreSchumacher <gi...@git.apache.org>.
Github user AndreSchumacher commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-43370347
  
    @rxin Thanks for the note. I just rebased it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41363722
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41361251
  
    But are there any realistic workloads where you'd want to turn this on all the time, or turn it off all the time? It seems that in an ad-hoc query workload, you'll have some queries that can use this, and some that can't. You should just pick whether you want it as a default. Personally I'd go for it unless the cost is super high in the cases where it doesn't work, because I imagine filtering is pretty common in large schemas and I hope Parquet itself optimizes this down the line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41428607
  
    Cool, thanks for renaming it.
    
    @mateiz I don't think we should even include these hints in the docs (unless we find particularly useful ones) as I agree presenting too much complexity to users is a bad idea.  However, even just for our own benchmarking, recompiling to change these settings is just not feasible and it's really hard to predict performance without actually running things.  Also when I've talked about building catalyst to experienced database people, basically everyone said, "No matter how good you think your optimizer is, always make sure you have knobs to control it because it is going to be wrong."
    
    Having these hints in the language could maybe be nice, but I really don't think that is worth the engineering effort of not only changing the parser, but also making sure they get threaded through analysis, optimization and planning correctly.  Using language based hints would also would change if you are using `sql`, `hql`, or the DSL.
    
    Having a special conf mechanism that lets you set them on a per query basis would be nice.  I'm not sure how flexible the SparkConf infrastructure is in this regard, but might be something to consider.  I can imagine cases where this might even be useful for standard spark jobs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AndreSchumacher <gi...@git.apache.org>.
Github user AndreSchumacher commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41203659
  
    This PR will need to be revised depending on the outcome of https://github.com/apache/spark/pull/482


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/511#discussion_r11930452
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala ---
    @@ -0,0 +1,252 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.parquet
    +
    +import org.apache.hadoop.conf.Configuration
    +
    +import parquet.filter._
    +import parquet.filter.ColumnPredicates._
    +import parquet.column.ColumnReader
    +
    +import com.google.common.io.BaseEncoding
    +
    +import org.apache.spark.sql.catalyst.types._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.execution.SparkSqlSerializer
    +
    +object ParquetFilters {
    +  val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
    +  // set this to false if pushdown should be disabled
    +  // Note: prefix is "spark.hadoop." so that it will be copied from SparkConf
    --- End diff --
    
    Is this comment up to date?  I don't see "spark.hadoop." anywhere else.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-43367615
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15047/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41229199
  
    Very cool.  This should be a pretty big performance win. Only minor comments.
    
    Regarding the question about normalizing expressions, it could be done in a rule.  However, really I think we can probably greatly simplify all of that logic with code gen (hopefully coming in 1.1).  So, given that you have already written out all of the cases I don't think we need to do further simplification in this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41462846
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41461461
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-43368489
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15048/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41364486
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41370695
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14482/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-43368488
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AndreSchumacher <gi...@git.apache.org>.
Github user AndreSchumacher commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41491300
  
    I added a new issue about the nullability question: https://issues.apache.org/jira/browse/SPARK-1649


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/511#discussion_r12010205
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
    @@ -175,12 +175,35 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
             InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
           case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
             InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
    -      case PhysicalOperation(projectList, filters, relation: ParquetRelation) =>
    -        // TODO: Should be pushing down filters as well.
    +      case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => {
    +        val remainingFilters =
    +          if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
    +            filters.filter {
    +              // Note: filters cannot be pushed down to Parquet if they contain more complex
    +              // expressions than simple "Attribute cmp Literal" comparisons. Here we remove
    +              // all filters that have been pushed down. Note that a predicate such as
    +              // "A AND B" can result in "A" being pushed down.
    --- End diff --
    
    However, we will never get `A AND B` right? As I think any conjunctive predicates will be split by using `PhysicalOperation`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41205564
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-43359714
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41356364
  
    @marmbrus @AndreSchumacher do we really want a SparkConf option for this? I'd rather minimize the number of options and add rules in the optimizer later to decide when to do this. These kind of options are esoteric and very hard for users to configure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AndreSchumacher <gi...@git.apache.org>.
Github user AndreSchumacher closed the pull request at:

    https://github.com/apache/spark/pull/511


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-42737719
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41460345
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-43367613
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41361765
  
    BTW if you do add a config setting, a better name would be `spark.sql.hints.parquetFilterPushdown`; our other setting names don't start with `org.apache`. An even better option though might be to put it in the SQL statement itself, so users can do it on a per-query basis, though that probably requires nasty changes to the parser. But I'd still prefer to either always turn this on (if the penalty isn't huge) or leave it off for now and not introduce a new setting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41455058
  
    Okay, it sounds good then as a hidden parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-43359218
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AndreSchumacher <gi...@git.apache.org>.
Github user AndreSchumacher commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-43400641
  
    Closing this now since it got merged. Thanks everyone.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41363714
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41460384
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14509/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1487 [SQL] Support record filtering via ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/511#issuecomment-41461458
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---