You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by liancheng <gi...@git.apache.org> on 2015/02/02 13:18:21 UTC

[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SQL] WIP: Parquet d...

GitHub user liancheng opened a pull request:

    https://github.com/apache/spark/pull/4308

    [SPARK-5182] [SPARK-5528] [SQL] WIP: Parquet data source improvements

    This PR adds three major improvements to Parquet data source:
    
    1.  Partition discovery
    
        While reading Parquet files resides in Hive style partition directories, `ParquetRelation2` automatically discovers partitioning information and infers partition column types.
    
        This is also a partial work for [SPARK-5182] [1], which aims to provide first class partitioning support for the data source API.  Related code in this PR can be easily extracted to the data source API level in future versions.
    
    1.  Schema merging
    
        When enabled, Parquet data source collects schema information from all Parquet part-files and tries to merge them.  Exceptions are thrown when incompatible schemas are detected.  This feature is controlled by data source option `parquet.mergeSchema`, and is enabled by default.
    
    1.  Metastore Parquet table conversion moved to analysis phase
    
        This greatly simplifies the conversion logic.  `ParquetConversion` strategy can be removed once the old Parquet implementation is removed in the future.
    
    This version of Parquet data source aims to entirely replace the old Parquet implementation.  However, the old version hasn't been removed yet.  Users can fall back to the old version by turning off SQL configuration `spark.sql.parquet.useDataSourceApi`.
    
    Other JIRA tickets fixed as side effects in this PR:
    
    - [SPARK-5509] [3]: `EqualTo` now uses a proper `Ordering` to compare binary types.
    
    - [SPARK-3575] [4]: Metastore schema is now preserved and passed to `ParquetRelation2` via data source option `parquet.metastoreSchema`.
    
    TODO:
    
    - [ ] More test cases for partition discovery
    - [ ] Fix write path after data source write support (#4294) is merged
    
          It turned out to be non-trivial to fall back to old Parquet implementation on the write path when Parquet data source is enabled.  Since we're planning to include data source write support in 1.3.0, I simply ignored two test cases involving Parquet insertion for now.
    
    - [ ] Fix outdated comments and documentations
    
    PS: More than a half of changed lines in this PR are trivial changes to test cases. To test Parquet with and without the new data source, almost all Parquet test cases are moved into wrapper driver functions. This introduces hundreds of lines of changes, etc.).
    
    [1]: https://issues.apache.org/jira/browse/SPARK-5182
    [2]: https://issues.apache.org/jira/browse/SPARK-5528
    [3]: https://issues.apache.org/jira/browse/SPARK-5509
    [4]: https://issues.apache.org/jira/browse/SPARK-3575


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/liancheng/spark parquet-partition-discovery

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/4308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4308
    
----
commit c0f220f76b15eafecfa14cc7021a2472384c8d14
Author: Cheng Lian <li...@databricks.com>
Date:   2015-01-30T01:41:18Z

    Draft version of Parquet partition discovery and schema merging

commit 5c405a8f3f8c511454e268379ec2348bdcb8902e
Author: Cheng Lian <li...@databricks.com>
Date:   2015-02-01T00:23:27Z

    Fixes all existing Parquet test suites except for ParquetMetastoreSuite

commit 5a5e18ed2e213904525375643ef7a2e1e34a590e
Author: Cheng Lian <li...@databricks.com>
Date:   2015-02-02T04:34:09Z

    Fixes Metastore Parquet table conversion

commit af3683ea68d3efe7c0368cb8d23fdd661fbfeffc
Author: Cheng Lian <li...@databricks.com>
Date:   2015-02-02T11:30:01Z

    Uses switch to control whether use Parquet data source or not

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SQL] WIP: Parquet d...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72449188
  
      [Test build #26514 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26514/consoleFull) for   PR 4308 at commit [`af3683e`](https://github.com/apache/spark/commit/af3683ea68d3efe7c0368cb8d23fdd661fbfeffc).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72595209
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26601/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r24139528
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala ---
    @@ -490,4 +489,55 @@ private[parquet] object ParquetTypesConverter extends Logging {
           attributes
         }
       }
    +
    +  def mergeCatalystSchemas(left: StructType, right: StructType): StructType =
    +    mergeCatalystDataTypes(left, right).asInstanceOf[StructType]
    +
    +  def mergeCatalystDataTypes(left: DataType, right: DataType): DataType =
    --- End diff --
    
    also should this live in catalyst?  Seems generally useful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72575502
  
      [Test build #26572 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26572/consoleFull) for   PR 4308 at commit [`170a0f8`](https://github.com/apache/spark/commit/170a0f8619a0a0eed26ccef71815c31abd9f89ac).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SQL] WIP: Parquet d...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r23921551
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
    @@ -227,66 +294,302 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
             val cacheMetadata = useCache
     
             @transient
    -        val cachedStatus = selectedPartitions.flatMap(_.files)
    +        val cachedStatus = selectedFiles
     
             // Overridden so we can inject our own cached files statuses.
             override def getPartitions: Array[SparkPartition] = {
    -          val inputFormat =
    -            if (cacheMetadata) {
    -              new FilteringParquetRowInputFormat {
    -                override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
    -              }
    -            } else {
    -              new FilteringParquetRowInputFormat
    +          val inputFormat = if (cacheMetadata) {
    +            new FilteringParquetRowInputFormat {
    +              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
                 }
    -
    -          inputFormat match {
    -            case configurable: Configurable =>
    -              configurable.setConf(getConf)
    -            case _ =>
    +          } else {
    +            new FilteringParquetRowInputFormat
               }
    +
               val jobContext = newJobContext(getConf, jobId)
    -          val rawSplits = inputFormat.getSplits(jobContext).toArray
    -          val result = new Array[SparkPartition](rawSplits.size)
    -          for (i <- 0 until rawSplits.size) {
    -            result(i) =
    -              new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
    +          val rawSplits = inputFormat.getSplits(jobContext)
    +
    +          Array.tabulate[SparkPartition](rawSplits.size) { i =>
    +            new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
               }
    -          result
             }
           }
     
    -    // The ordinal for the partition key in the result row, if requested.
    -    val partitionKeyLocation =
    -      partitionKeys
    -        .headOption
    -        .map(requiredColumns.indexOf(_))
    -        .getOrElse(-1)
    +    // The ordinals for partition keys in the result row, if requested.
    +    val partitionKeyLocations = partitionColumns.fieldNames.zipWithIndex.map {
    +      case (name, index) => index -> requiredColumns.indexOf(name)
    +    }.toMap.filter {
    +      case (_, index) => index >= 0
    +    }
     
         // When the data does not include the key and the key is requested then we must fill it in
         // based on information from the input split.
    -    if (!dataIncludesKey && partitionKeyLocation != -1) {
    -      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
    -        val partValue = "([^=]+)=([^=]+)".r
    -        val partValues =
    -          split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
    -            .getPath
    -            .toString
    -            .split("/")
    -            .flatMap {
    -            case partValue(key, value) => Some(key -> value)
    -            case _ => None
    -          }.toMap
    -
    -        val currentValue = partValues.values.head.toInt
    -        iter.map { pair =>
    -          val res = pair._2.asInstanceOf[SpecificMutableRow]
    -          res.setInt(partitionKeyLocation, currentValue)
    -          res
    +    if (!dataSchemaIncludesPartitionKeys && partitionKeyLocations.nonEmpty) {
    +      baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
    +        val partValues = selectedPartitions.collectFirst {
    +          case p if split.getPath.getParent.toString == p.path => p.values
    +        }.get
    +
    +        iterator.map { pair =>
    +          val row = pair._2.asInstanceOf[SpecificMutableRow]
    +          var i = 0
    +          while (i < partValues.size) {
    +            // TODO Avoids boxing cost here!
    +            row.update(partitionKeyLocations(i), partValues(i))
    +            i += 1
    +          }
    +          row
             }
           }
         } else {
           baseRDD.map(_._2)
         }
       }
    +
    +  private def prunePartitions(
    +      predicates: Seq[Expression],
    +      partitions: Seq[Partition]): Seq[Partition] = {
    +    val partitionColumnNames = partitionColumns.map(_.name).toSet
    +    val partitionPruningPredicates = predicates.filter {
    +      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
    +    }
    +
    +    val rawPredicate = partitionPruningPredicates.reduceOption(And).getOrElse(Literal(true))
    +    val boundPredicate = InterpretedPredicate(rawPredicate transform {
    +      case a: AttributeReference =>
    +        val index = partitionColumns.indexWhere(a.name == _.name)
    +        BoundReference(index, partitionColumns(index).dataType, nullable = true)
    +    })
    +
    +    if (isPartitioned && partitionPruningPredicates.nonEmpty) {
    +      partitions.filter(p => boundPredicate(p.values))
    +    } else {
    +      partitions
    +    }
    +  }
    +}
    +
    +object ParquetRelation2 {
    +  // Whether we should merge schemas collected from all Parquet part-files.
    +  val MERGE_SCHEMA = "parquet.mergeSchema"
    +
    +  // Hive Metastore schema, passed in when the Parquet relation is converted from Metastore
    +  val METASTORE_SCHEMA = "parquet.metastoreSchema"
    +
    +  // Default partition name to use when the partition column value is null or empty string
    +  val DEFAULT_PARTITION_NAME = "partition.defaultName"
    +
    +  private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
    +    footers.map { footer =>
    +      val metadata = footer.getParquetMetadata.getFileMetaData
    +      val parquetSchema = metadata.getSchema
    +      val maybeSparkSchema = metadata
    +        .getKeyValueMetaData
    +        .toMap
    +        .get(RowReadSupport.SPARK_METADATA_KEY)
    +        .map(DataType.fromJson(_).asInstanceOf[StructType])
    +
    +      maybeSparkSchema.getOrElse {
    +        // Falls back to Parquet schema if Spark SQL schema is absent.
    +        StructType.fromAttributes(
    +          // TODO Really no need to use `Attribute` here, we only need to know the data type.
    +          convertToAttributes(parquetSchema, sqlContext.conf.isParquetBinaryAsString))
    +      }
    +    }.reduce { (left, right) =>
    +      try mergeCatalystSchemas(left, right) catch { case e: Throwable =>
    +        throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
    +      }
    +    }
    +  }
    +
    +  private[parquet] def mergeMetastoreParquetSchema(
    +      metastoreSchema: StructType,
    +      parquetSchema: StructType): StructType = {
    +    def schemaConflictMessage =
    +      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
    +         |${metastoreSchema.prettyJson}
    +         |
    +         |Parquet schema:
    +         |${parquetSchema.prettyJson}
    +       """.stripMargin
    +
    +    assert(metastoreSchema.size == parquetSchema.size, schemaConflictMessage)
    +
    +    val ordinalMap = metastoreSchema.zipWithIndex.map {
    +      case (field, index) => field.name.toLowerCase -> index
    +    }.toMap
    +    val reorderedParquetSchema = parquetSchema.sortBy(f => ordinalMap(f.name.toLowerCase))
    +
    +    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
    +      // Uses Parquet field names but retains Metastore data types.
    +      case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
    +        mSchema.copy(name = pSchema.name)
    +      case _ =>
    +        throw new SparkException(schemaConflictMessage)
    +    })
    +  }
    +
    +  // TODO Data source implementations shouldn't touch Catalyst types (`Literal`).
    +  // However, we are already using Catalyst expressions for partition pruning and predicate
    +  // push-down here...
    +  private[parquet] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
    +    require(columnNames.size == literals.size)
    +  }
    +
    +  /**
    +   * Given a group of qualified paths, tries to parse them and returns a partition specification.
    +   * For example, given:
    +   * {{{
    +   *   hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
    +   *   hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
    +   * }}}
    +   * it returns:
    +   * {{{
    +   *   PartitionSpec(
    +   *     partitionColumns = StructType(
    +   *       StructField(name = "a", dataType = IntegerType, nullable = true),
    +   *       StructField(name = "b", dataType = StringType, nullable = true),
    +   *       StructField(name = "c", dataType = DoubleType, nullable = true)),
    +   *     partitions = Seq(
    +   *       Partition(
    +   *         values = Row(1, "hello", 3.14),
    +   *         path = "hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
    +   *       Partition(
    +   *         values = Row(2, "world", 6.28),
    +   *         path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
    +   * }}}
    +   */
    +  private[parquet] def parsePartitions(
    +      paths: Seq[Path],
    +      defaultPartitionName: String): PartitionSpec = {
    +    val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName)))
    +    val fields = {
    +      val (PartitionValues(columnNames, literals)) = partitionValues.head
    +      columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
    +        StructField(name, dataType, nullable = true)
    +      }
    +    }
    +
    +    val partitions = partitionValues.zip(paths).map {
    +      case (PartitionValues(_, literals), path) =>
    +        Partition(Row(literals.map(_.value): _*), path.toString)
    +    }
    +
    +    PartitionSpec(StructType(fields), partitions)
    +  }
    +
    +  /**
    +   * Parses a single partition, returns column names and values of each partition column.  For
    +   * example, given:
    +   * {{{
    +   *   basePath = hdfs://<host>:<port>/path/to/partition
    +   *   partitionPath = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
    +   * }}}
    +   * it returns:
    +   * {{{
    +   *   PartitionDesc(
    +   *     Seq("a", "b", "c"),
    +   *     Seq(
    +   *       Literal(42, IntegerType),
    +   *       Literal("hello", StringType),
    +   *       Literal(3.14, FloatType)))
    +   * }}}
    +   */
    +  private[parquet] def parsePartition(
    +      path: Path,
    +      defaultPartitionName: String): PartitionValues = {
    +    val columns = ArrayBuffer.empty[(String, Literal)]
    +    var finished = path.isRoot
    +    var chopped = path
    +
    +    while (!finished) {
    +      val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName)
    +      maybeColumn.foreach(columns += _)
    +      chopped = chopped.getParent
    +      finished = maybeColumn.isEmpty || chopped.isRoot
    +    }
    +
    +    val (columnNames, values) = columns.unzip
    +    PartitionValues(columnNames, values)
    +  }
    +
    +  private def parsePartitionColumn(
    +      columnSpec: String,
    +      defaultPartitionName: String): Option[(String, Literal)] = {
    +    val equalSignIndex = columnSpec.indexOf('=')
    +    if (equalSignIndex == -1) {
    +      None
    +    } else {
    +      val columnName = columnSpec.take(equalSignIndex)
    +      val literal = inferPartitionColumnValue(
    +        columnSpec.drop(equalSignIndex + 1), defaultPartitionName)
    +      Some(columnName -> literal)
    +    }
    +  }
    +
    +  /**
    +   * Resolves possible type conflicts between partitions by up-casting "lower" types.  The up-
    +   * casting order is:
    +   * {{{
    +   *   NullType ->
    +   *   IntegerType -> LongType ->
    +   *   FloatType -> DoubleType -> DecimalType.Unlimited ->
    +   *   StringType
    +   * }}}
    +   */
    +  private[parquet] def resolvePartitions(descs: Seq[PartitionValues]): Seq[PartitionValues] = {
    +    val distinctColNamesOfPartitions = descs.map(_.columnNames).distinct
    +    val columnCount = descs.head.columnNames.size
    +
    +    // Column names of all partitions must match
    +    assert(distinctColNamesOfPartitions.size == 1, {
    +      val list = distinctColNamesOfPartitions.mkString("\t", "\n", "")
    +      s"Conflicting partition column names detected:\n$list"
    +    })
    +
    +    // Resolves possible type conflicts for each column
    +    val resolvedValues = (0 until columnCount).map { i =>
    +      resolveTypeConflicts(descs.map(_.literals(i)))
    +    }
    +
    +    // Fills resolved literals back to each partition
    +    descs.zipWithIndex.map { case (d, index) =>
    +      d.copy(literals = resolvedValues.map(_(index)))
    +    }
    +  }
    +
    +  /**
    +   * Converts a string to a `Literal` with automatic type inference.  Currently only supports
    +   * [[IntegerType]], [[LongType]], [[FloatType]], [[DoubleType]], [[DecimalType.Unlimited]], and
    +   * [[StringType]].
    +   */
    +  private[parquet] def inferPartitionColumnValue(
    +      raw: String,
    +      defaultPartitionName: String): Literal = {
    +    // First tries integral types
    +    Try(Literal(Integer.parseInt(raw), IntegerType))
    +      .orElse(Try(Literal(JLong.parseLong(raw), LongType)))
    +      // Then falls back to fractional types
    +      .orElse(Try(Literal(JFloat.parseFloat(raw), FloatType)))
    +      .orElse(Try(Literal(JDouble.parseDouble(raw), DoubleType)))
    +      .orElse(Try(Literal(new JBigDecimal(raw), DecimalType.Unlimited)))
    +      // Then falls back to string
    +      .getOrElse {
    +        if (raw == defaultPartitionName) Literal(null, NullType) else Literal(raw, StringType)
    --- End diff --
    
    For string, should we convert default partition name to `null` or empty string? Need to check Hive behaviour here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r24140505
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala ---
    @@ -490,4 +489,55 @@ private[parquet] object ParquetTypesConverter extends Logging {
           attributes
         }
       }
    +
    +  def mergeCatalystSchemas(left: StructType, right: StructType): StructType =
    +    mergeCatalystDataTypes(left, right).asInstanceOf[StructType]
    +
    +  def mergeCatalystDataTypes(left: DataType, right: DataType): DataType =
    --- End diff --
    
    Yeah, will move it to Catalyst in follow-up PRs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SQL] WIP: Parquet d...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r23921094
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala ---
    @@ -175,7 +175,10 @@ case class EqualTo(left: Expression, right: Expression) extends BinaryComparison
           null
         } else {
           val r = right.eval(input)
    -      if (r == null) null else l == r
    +      if (r == null) null
    +      else if (left.dataType != BinaryType) l == r
    +      else BinaryType.ordering.compare(
    +        l.asInstanceOf[Array[Byte]], r.asInstanceOf[Array[Byte]]) == 0
    --- End diff --
    
    This fixes SPARK-5509. Hit this bug while testing Parquet filters for new data source implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-73150568
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26858/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r23987571
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
    @@ -262,8 +262,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
        *
        * @group userf
        */
    -  def parquetFile(path: String): DataFrame =
    -    DataFrame(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
    +  @scala.annotation.varargs
    +  def parquetFile(paths: String*): DataFrame =
    --- End diff --
    
    Thanks. Makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72921592
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r24140729
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
    @@ -262,8 +262,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
        *
        * @group userf
        */
    -  def parquetFile(path: String): DataFrame =
    -    DataFrame(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
    +  @scala.annotation.varargs
    +  def parquetFile(paths: String*): DataFrame =
    --- End diff --
    
    okay and i convinced @rxin too :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-73133148
  
    OK, rebased for the 9th time... Addressed all comments except for adding option to disable metadata caching, which I'd like to include in another PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r23988104
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala ---
    @@ -175,7 +175,10 @@ case class EqualTo(left: Expression, right: Expression) extends BinaryComparison
           null
         } else {
           val r = right.eval(input)
    -      if (r == null) null else l == r
    +      if (r == null) null
    +      else if (left.dataType != BinaryType) l == r
    +      else BinaryType.ordering.compare(
    +        l.asInstanceOf[Array[Byte]], r.asInstanceOf[Array[Byte]]) == 0
    --- End diff --
    
    Filed SPARK-5553 to track this. I'd like to make sure equality comparison for binary types works properly in this PR. Also, we're already using `Ordering` to compare binary values in `LessThan` and `GreaterThan` etc., so at least this isn't a regression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r25603074
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
    @@ -228,66 +355,400 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
             val cacheMetadata = useCache
     
             @transient
    -        val cachedStatus = selectedPartitions.flatMap(_.files)
    +        val cachedStatus = selectedFiles
     
             // Overridden so we can inject our own cached files statuses.
             override def getPartitions: Array[SparkPartition] = {
    -          val inputFormat =
    -            if (cacheMetadata) {
    -              new FilteringParquetRowInputFormat {
    -                override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
    -              }
    -            } else {
    -              new FilteringParquetRowInputFormat
    +          val inputFormat = if (cacheMetadata) {
    +            new FilteringParquetRowInputFormat {
    +              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
                 }
    -
    -          inputFormat match {
    -            case configurable: Configurable =>
    -              configurable.setConf(getConf)
    -            case _ =>
    +          } else {
    +            new FilteringParquetRowInputFormat
               }
    +
               val jobContext = newJobContext(getConf, jobId)
    -          val rawSplits = inputFormat.getSplits(jobContext).toArray
    -          val result = new Array[SparkPartition](rawSplits.size)
    -          for (i <- 0 until rawSplits.size) {
    -            result(i) =
    -              new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
    +          val rawSplits = inputFormat.getSplits(jobContext)
    +
    +          Array.tabulate[SparkPartition](rawSplits.size) { i =>
    +            new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
               }
    -          result
             }
           }
     
    -    // The ordinal for the partition key in the result row, if requested.
    -    val partitionKeyLocation =
    -      partitionKeys
    -        .headOption
    -        .map(requiredColumns.indexOf(_))
    -        .getOrElse(-1)
    +    // The ordinals for partition keys in the result row, if requested.
    +    val partitionKeyLocations = partitionColumns.fieldNames.zipWithIndex.map {
    +      case (name, index) => index -> requiredColumns.indexOf(name)
    +    }.toMap.filter {
    +      case (_, index) => index >= 0
    +    }
     
         // When the data does not include the key and the key is requested then we must fill it in
         // based on information from the input split.
    -    if (!dataIncludesKey && partitionKeyLocation != -1) {
    -      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
    -        val partValue = "([^=]+)=([^=]+)".r
    -        val partValues =
    -          split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
    -            .getPath
    -            .toString
    -            .split("/")
    -            .flatMap {
    -            case partValue(key, value) => Some(key -> value)
    -            case _ => None
    -          }.toMap
    -
    -        val currentValue = partValues.values.head.toInt
    -        iter.map { pair =>
    -          val res = pair._2.asInstanceOf[SpecificMutableRow]
    -          res.setInt(partitionKeyLocation, currentValue)
    -          res
    +    if (!dataSchemaIncludesPartitionKeys && partitionKeyLocations.nonEmpty) {
    +      baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
    +        val partValues = selectedPartitions.collectFirst {
    +          case p if split.getPath.getParent.toString == p.path => p.values
    +        }.get
    +
    +        iterator.map { pair =>
    +          val row = pair._2.asInstanceOf[SpecificMutableRow]
    +          var i = 0
    +          while (i < partValues.size) {
    +            // TODO Avoids boxing cost here!
    +            row.update(partitionKeyLocations(i), partValues(i))
    +            i += 1
    +          }
    +          row
             }
           }
         } else {
           baseRDD.map(_._2)
         }
       }
    +
    +  private def prunePartitions(
    +      predicates: Seq[Expression],
    +      partitions: Seq[Partition]): Seq[Partition] = {
    +    val partitionColumnNames = partitionColumns.map(_.name).toSet
    +    val partitionPruningPredicates = predicates.filter {
    +      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
    +    }
    +
    +    val rawPredicate = partitionPruningPredicates.reduceOption(And).getOrElse(Literal(true))
    +    val boundPredicate = InterpretedPredicate(rawPredicate transform {
    +      case a: AttributeReference =>
    +        val index = partitionColumns.indexWhere(a.name == _.name)
    +        BoundReference(index, partitionColumns(index).dataType, nullable = true)
    +    })
    +
    +    if (isPartitioned && partitionPruningPredicates.nonEmpty) {
    +      partitions.filter(p => boundPredicate(p.values))
    +    } else {
    +      partitions
    +    }
    +  }
    +
    +  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
    +    // TODO: currently we do not check whether the "schema"s are compatible
    +    // That means if one first creates a table and then INSERTs data with
    +    // and incompatible schema the execution will fail. It would be nice
    +    // to catch this early one, maybe having the planner validate the schema
    +    // before calling execute().
    +
    +    val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
    +    val writeSupport = if (schema.map(_.dataType).forall(_.isPrimitive)) {
    +      log.debug("Initializing MutableRowWriteSupport")
    +      classOf[MutableRowWriteSupport]
    +    } else {
    +      classOf[RowWriteSupport]
    +    }
    +
    +    ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
    +
    +    val conf = ContextUtil.getConfiguration(job)
    +    RowWriteSupport.setSchema(schema.toAttributes, conf)
    +
    +    val destinationPath = new Path(paths.head)
    +
    +    if (overwrite) {
    +      try {
    +        destinationPath.getFileSystem(conf).delete(destinationPath, true)
    +      } catch {
    +        case e: IOException =>
    +          throw new IOException(
    +            s"Unable to clear output directory ${destinationPath.toString} prior" +
    +              s" to writing to Parquet file:\n${e.toString}")
    +      }
    +    }
    +
    +    job.setOutputKeyClass(classOf[Void])
    +    job.setOutputValueClass(classOf[Row])
    +    FileOutputFormat.setOutputPath(job, destinationPath)
    +
    +    val wrappedConf = new SerializableWritable(job.getConfiguration)
    +    val jobTrackerId = new SimpleDateFormat("yyyyMMddHHmm").format(new Date())
    +    val stageId = sqlContext.sparkContext.newRddId()
    +
    +    val taskIdOffset = if (overwrite) {
    +      1
    +    } else {
    +      FileSystemHelper.findMaxTaskId(
    +        FileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
    +    }
    +
    +    def writeShard(context: TaskContext, iterator: Iterator[Row]): Unit = {
    +      /* "reduce task" <split #> <attempt # = spark task #> */
    +      val attemptId = newTaskAttemptID(
    +        jobTrackerId, stageId, isMap = false, context.partitionId(), context.attemptNumber())
    +      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
    +      val format = new AppendingParquetOutputFormat(taskIdOffset)
    +      val committer = format.getOutputCommitter(hadoopContext)
    +      committer.setupTask(hadoopContext)
    +      val writer = format.getRecordWriter(hadoopContext)
    +      try {
    +        while (iterator.hasNext) {
    +          val row = iterator.next()
    +          writer.write(null, row)
    +        }
    +      } finally {
    +        writer.close(hadoopContext)
    +      }
    +      committer.commitTask(hadoopContext)
    +    }
    +    val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
    +    /* apparently we need a TaskAttemptID to construct an OutputCommitter;
    +     * however we're only going to use this local OutputCommitter for
    +     * setupJob/commitJob, so we just use a dummy "map" task.
    +     */
    +    val jobAttemptId = newTaskAttemptID(jobTrackerId, stageId, isMap = true, 0, 0)
    +    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
    +    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
    +
    +    jobCommitter.setupJob(jobTaskContext)
    +    sqlContext.sparkContext.runJob(data.queryExecution.executedPlan.execute(), writeShard _)
    +    jobCommitter.commitJob(jobTaskContext)
    +
    +    metadataCache.refresh()
    +  }
    +}
    +
    +object ParquetRelation2 {
    +  // Whether we should merge schemas collected from all Parquet part-files.
    +  val MERGE_SCHEMA = "mergeSchema"
    +
    +  // Hive Metastore schema, passed in when the Parquet relation is converted from Metastore
    +  val METASTORE_SCHEMA = "metastoreSchema"
    +
    +  // Default partition name to use when the partition column value is null or empty string
    +  val DEFAULT_PARTITION_NAME = "partition.defaultName"
    +
    +  // When true, the Parquet data source caches Parquet metadata for performance
    +  val CACHE_METADATA = "cacheMetadata"
    +
    +  private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
    +    footers.map { footer =>
    +      val metadata = footer.getParquetMetadata.getFileMetaData
    +      val parquetSchema = metadata.getSchema
    +      val maybeSparkSchema = metadata
    +        .getKeyValueMetaData
    +        .toMap
    +        .get(RowReadSupport.SPARK_METADATA_KEY)
    +        .map(DataType.fromJson(_).asInstanceOf[StructType])
    +
    +      maybeSparkSchema.getOrElse {
    +        // Falls back to Parquet schema if Spark SQL schema is absent.
    +        StructType.fromAttributes(
    +          // TODO Really no need to use `Attribute` here, we only need to know the data type.
    +          convertToAttributes(
    +            parquetSchema,
    +            sqlContext.conf.isParquetBinaryAsString,
    +            sqlContext.conf.isParquetINT96AsTimestamp))
    +      }
    +    }.reduce { (left, right) =>
    +      try left.merge(right) catch { case e: Throwable =>
    +        throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
    +      }
    +    }
    +  }
    +
    +  private[parquet] def mergeMetastoreParquetSchema(
    +      metastoreSchema: StructType,
    +      parquetSchema: StructType): StructType = {
    +    def schemaConflictMessage =
    +      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
    +         |${metastoreSchema.prettyJson}
    +         |
    +         |Parquet schema:
    +         |${parquetSchema.prettyJson}
    +       """.stripMargin
    +
    +    assert(metastoreSchema.size == parquetSchema.size, schemaConflictMessage)
    +
    +    val ordinalMap = metastoreSchema.zipWithIndex.map {
    +      case (field, index) => field.name.toLowerCase -> index
    +    }.toMap
    +    val reorderedParquetSchema = parquetSchema.sortBy(f => ordinalMap(f.name.toLowerCase))
    +
    +    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
    +      // Uses Parquet field names but retains Metastore data types.
    +      case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
    +        mSchema.copy(name = pSchema.name)
    +      case _ =>
    +        throw new SparkException(schemaConflictMessage)
    +    })
    +  }
    +
    +  // TODO Data source implementations shouldn't touch Catalyst types (`Literal`).
    +  // However, we are already using Catalyst expressions for partition pruning and predicate
    +  // push-down here...
    +  private[parquet] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
    +    require(columnNames.size == literals.size)
    +  }
    +
    +  /**
    +   * Given a group of qualified paths, tries to parse them and returns a partition specification.
    +   * For example, given:
    +   * {{{
    +   *   hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
    +   *   hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
    +   * }}}
    +   * it returns:
    +   * {{{
    +   *   PartitionSpec(
    +   *     partitionColumns = StructType(
    +   *       StructField(name = "a", dataType = IntegerType, nullable = true),
    +   *       StructField(name = "b", dataType = StringType, nullable = true),
    +   *       StructField(name = "c", dataType = DoubleType, nullable = true)),
    +   *     partitions = Seq(
    +   *       Partition(
    +   *         values = Row(1, "hello", 3.14),
    +   *         path = "hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
    +   *       Partition(
    +   *         values = Row(2, "world", 6.28),
    +   *         path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
    +   * }}}
    +   */
    +  private[parquet] def parsePartitions(
    +      paths: Seq[Path],
    +      defaultPartitionName: String): PartitionSpec = {
    +    val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName)))
    +    val fields = {
    +      val (PartitionValues(columnNames, literals)) = partitionValues.head
    +      columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
    +        StructField(name, dataType, nullable = true)
    +      }
    +    }
    +
    +    val partitions = partitionValues.zip(paths).map {
    +      case (PartitionValues(_, literals), path) =>
    +        Partition(Row(literals.map(_.value): _*), path.toString)
    +    }
    +
    +    PartitionSpec(StructType(fields), partitions)
    +  }
    +
    +  /**
    +   * Parses a single partition, returns column names and values of each partition column.  For
    +   * example, given:
    +   * {{{
    +   *   path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
    +   * }}}
    +   * it returns:
    +   * {{{
    +   *   PartitionValues(
    +   *     Seq("a", "b", "c"),
    +   *     Seq(
    +   *       Literal(42, IntegerType),
    +   *       Literal("hello", StringType),
    +   *       Literal(3.14, FloatType)))
    +   * }}}
    +   */
    +  private[parquet] def parsePartition(
    +      path: Path,
    +      defaultPartitionName: String): PartitionValues = {
    +    val columns = ArrayBuffer.empty[(String, Literal)]
    +    // Old Hadoop versions don't have `Path.isRoot`
    +    var finished = path.getParent == null
    +    var chopped = path
    +
    +    while (!finished) {
    +      val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName)
    +      maybeColumn.foreach(columns += _)
    +      chopped = chopped.getParent
    +      finished = maybeColumn.isEmpty || chopped.getParent == null
    +    }
    +
    +    val (columnNames, values) = columns.reverse.unzip
    +    PartitionValues(columnNames, values)
    +  }
    +
    +  private def parsePartitionColumn(
    +      columnSpec: String,
    +      defaultPartitionName: String): Option[(String, Literal)] = {
    +    val equalSignIndex = columnSpec.indexOf('=')
    +    if (equalSignIndex == -1) {
    +      None
    +    } else {
    +      val columnName = columnSpec.take(equalSignIndex)
    +      assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'")
    +
    +      val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
    +      assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")
    +
    +      val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName)
    +      Some(columnName -> literal)
    +    }
    +  }
    +
    +  /**
    +   * Resolves possible type conflicts between partitions by up-casting "lower" types.  The up-
    +   * casting order is:
    +   * {{{
    +   *   NullType ->
    +   *   IntegerType -> LongType ->
    +   *   FloatType -> DoubleType -> DecimalType.Unlimited ->
    +   *   StringType
    +   * }}}
    +   */
    +  private[parquet] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
    +    val distinctColNamesOfPartitions = values.map(_.columnNames).distinct
    +    val columnCount = values.head.columnNames.size
    +
    +    // Column names of all partitions must match
    +    assert(distinctColNamesOfPartitions.size == 1, {
    +      val list = distinctColNamesOfPartitions.mkString("\t", "\n", "")
    +      s"Conflicting partition column names detected:\n$list"
    +    })
    +
    +    // Resolves possible type conflicts for each column
    +    val resolvedValues = (0 until columnCount).map { i =>
    +      resolveTypeConflicts(values.map(_.literals(i)))
    +    }
    +
    +    // Fills resolved literals back to each partition
    +    values.zipWithIndex.map { case (d, index) =>
    +      d.copy(literals = resolvedValues.map(_(index)))
    +    }
    +  }
    +
    +  /**
    +   * Converts a string to a `Literal` with automatic type inference.  Currently only supports
    +   * [[IntegerType]], [[LongType]], [[FloatType]], [[DoubleType]], [[DecimalType.Unlimited]], and
    +   * [[StringType]].
    --- End diff --
    
    Good point. Trying `DataType` before `StringType` makes sense. I can add this. Thanks for the suggestion!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72926497
  
    The last build failure was caused by a flaky ML test case, which is now fixed in master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72940628
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26769/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72950506
  
    retest this please.
    
    The last build failure reports that `isFile` and `isRoot` are not member of `org.apache.hadoop.fs.FileStatus`, which doesn't make sense (the pull request builder uses Hadoop 2.3.0, and these methods are definitely defined in `FileStatus`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r24140596
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
    @@ -228,66 +347,397 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
             val cacheMetadata = useCache
     
             @transient
    -        val cachedStatus = selectedPartitions.flatMap(_.files)
    +        val cachedStatus = selectedFiles
     
             // Overridden so we can inject our own cached files statuses.
             override def getPartitions: Array[SparkPartition] = {
    -          val inputFormat =
    -            if (cacheMetadata) {
    -              new FilteringParquetRowInputFormat {
    -                override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
    -              }
    -            } else {
    -              new FilteringParquetRowInputFormat
    +          val inputFormat = if (cacheMetadata) {
    +            new FilteringParquetRowInputFormat {
    +              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
                 }
    -
    -          inputFormat match {
    -            case configurable: Configurable =>
    -              configurable.setConf(getConf)
    -            case _ =>
    +          } else {
    +            new FilteringParquetRowInputFormat
               }
    +
               val jobContext = newJobContext(getConf, jobId)
    -          val rawSplits = inputFormat.getSplits(jobContext).toArray
    -          val result = new Array[SparkPartition](rawSplits.size)
    -          for (i <- 0 until rawSplits.size) {
    -            result(i) =
    -              new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
    +          val rawSplits = inputFormat.getSplits(jobContext)
    +
    +          Array.tabulate[SparkPartition](rawSplits.size) { i =>
    +            new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
               }
    -          result
             }
           }
     
    -    // The ordinal for the partition key in the result row, if requested.
    -    val partitionKeyLocation =
    -      partitionKeys
    -        .headOption
    -        .map(requiredColumns.indexOf(_))
    -        .getOrElse(-1)
    +    // The ordinals for partition keys in the result row, if requested.
    +    val partitionKeyLocations = partitionColumns.fieldNames.zipWithIndex.map {
    +      case (name, index) => index -> requiredColumns.indexOf(name)
    +    }.toMap.filter {
    +      case (_, index) => index >= 0
    +    }
     
         // When the data does not include the key and the key is requested then we must fill it in
         // based on information from the input split.
    -    if (!dataIncludesKey && partitionKeyLocation != -1) {
    -      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
    -        val partValue = "([^=]+)=([^=]+)".r
    -        val partValues =
    -          split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
    -            .getPath
    -            .toString
    -            .split("/")
    -            .flatMap {
    -            case partValue(key, value) => Some(key -> value)
    -            case _ => None
    -          }.toMap
    -
    -        val currentValue = partValues.values.head.toInt
    -        iter.map { pair =>
    -          val res = pair._2.asInstanceOf[SpecificMutableRow]
    -          res.setInt(partitionKeyLocation, currentValue)
    -          res
    +    if (!dataSchemaIncludesPartitionKeys && partitionKeyLocations.nonEmpty) {
    +      baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
    +        val partValues = selectedPartitions.collectFirst {
    +          case p if split.getPath.getParent.toString == p.path => p.values
    +        }.get
    +
    +        iterator.map { pair =>
    +          val row = pair._2.asInstanceOf[SpecificMutableRow]
    +          var i = 0
    +          while (i < partValues.size) {
    +            // TODO Avoids boxing cost here!
    +            row.update(partitionKeyLocations(i), partValues(i))
    +            i += 1
    +          }
    +          row
             }
           }
         } else {
           baseRDD.map(_._2)
         }
       }
    +
    +  private def prunePartitions(
    +      predicates: Seq[Expression],
    +      partitions: Seq[Partition]): Seq[Partition] = {
    +    val partitionColumnNames = partitionColumns.map(_.name).toSet
    +    val partitionPruningPredicates = predicates.filter {
    +      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
    +    }
    +
    +    val rawPredicate = partitionPruningPredicates.reduceOption(And).getOrElse(Literal(true))
    +    val boundPredicate = InterpretedPredicate(rawPredicate transform {
    +      case a: AttributeReference =>
    +        val index = partitionColumns.indexWhere(a.name == _.name)
    +        BoundReference(index, partitionColumns(index).dataType, nullable = true)
    +    })
    +
    +    if (isPartitioned && partitionPruningPredicates.nonEmpty) {
    +      partitions.filter(p => boundPredicate(p.values))
    +    } else {
    +      partitions
    +    }
    +  }
    +
    +  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
    +    // TODO: currently we do not check whether the "schema"s are compatible
    +    // That means if one first creates a table and then INSERTs data with
    +    // and incompatible schema the execution will fail. It would be nice
    +    // to catch this early one, maybe having the planner validate the schema
    +    // before calling execute().
    +
    +    val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
    +    val writeSupport = if (schema.map(_.dataType).forall(_.isPrimitive)) {
    +      log.debug("Initializing MutableRowWriteSupport")
    +      classOf[MutableRowWriteSupport]
    +    } else {
    +      classOf[RowWriteSupport]
    +    }
    +
    +    ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
    +
    +    val conf = ContextUtil.getConfiguration(job)
    +    RowWriteSupport.setSchema(schema.toAttributes, conf)
    +
    +    val destinationPath = new Path(paths.head)
    +
    +    if (overwrite) {
    +      try {
    +        destinationPath.getFileSystem(conf).delete(destinationPath, true)
    +      } catch {
    +        case e: IOException =>
    +          throw new IOException(
    +            s"Unable to clear output directory ${destinationPath.toString} prior" +
    +              s" to writing to Parquet file:\n${e.toString}")
    +      }
    +    }
    +
    +    job.setOutputKeyClass(classOf[Void])
    +    job.setOutputValueClass(classOf[Row])
    +    FileOutputFormat.setOutputPath(job, destinationPath)
    +
    +    val wrappedConf = new SerializableWritable(job.getConfiguration)
    +    val jobTrackerId = new SimpleDateFormat("yyyyMMddHHmm").format(new Date())
    +    val stageId = sqlContext.sparkContext.newRddId()
    +
    +    val taskIdOffset = if (overwrite) {
    +      1
    +    } else {
    +      FileSystemHelper.findMaxTaskId(
    +        FileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
    +    }
    +
    +    def writeShard(context: TaskContext, iterator: Iterator[Row]): Unit = {
    +      /* "reduce task" <split #> <attempt # = spark task #> */
    +      val attemptId = newTaskAttemptID(
    +        jobTrackerId, stageId, isMap = false, context.partitionId(), context.attemptNumber())
    +      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
    +      val format = new AppendingParquetOutputFormat(taskIdOffset)
    +      val committer = format.getOutputCommitter(hadoopContext)
    +      committer.setupTask(hadoopContext)
    +      val writer = format.getRecordWriter(hadoopContext)
    +      try {
    +        while (iterator.hasNext) {
    +          val row = iterator.next()
    +          writer.write(null, row)
    +        }
    +      } finally {
    +        writer.close(hadoopContext)
    +      }
    +      committer.commitTask(hadoopContext)
    +    }
    +    val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
    +    /* apparently we need a TaskAttemptID to construct an OutputCommitter;
    +     * however we're only going to use this local OutputCommitter for
    +     * setupJob/commitJob, so we just use a dummy "map" task.
    +     */
    +    val jobAttemptId = newTaskAttemptID(jobTrackerId, stageId, isMap = true, 0, 0)
    +    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
    +    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
    +
    +    jobCommitter.setupJob(jobTaskContext)
    +    sqlContext.sparkContext.runJob(data.queryExecution.executedPlan.execute(), writeShard _)
    +    jobCommitter.commitJob(jobTaskContext)
    +
    +    metadataCache.refresh()
    +  }
    +}
    +
    +object ParquetRelation2 {
    +  // Whether we should merge schemas collected from all Parquet part-files.
    +  val MERGE_SCHEMA = "parquet.mergeSchema"
    --- End diff --
    
    should we also have an option to turn off caching?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72984960
  
      [Test build #26797 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26797/consoleFull) for   PR 4308 at commit [`209f324`](https://github.com/apache/spark/commit/209f324932302c18d4d41160cf80e6026350a6f0).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]`
      * `trait CreatableRelationProvider `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72734610
  
      [Test build #26667 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26667/consoleFull) for   PR 4308 at commit [`5584e24`](https://github.com/apache/spark/commit/5584e240419953db488e832d969ede8f7452c995).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class DefaultSource extends RelationProvider with SchemaRelationProvider `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-73138277
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26856/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r24140650
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
    @@ -262,8 +262,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
        *
        * @group userf
        */
    -  def parquetFile(path: String): DataFrame =
    -    DataFrame(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
    +  @scala.annotation.varargs
    +  def parquetFile(paths: String*): DataFrame =
    --- End diff --
    
    Are we actually ever going to do that for this function?  This makes it harder to do something like `parquetFile(listOfFiles: _*)` which I think is actually a common usecase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72822646
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26734/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r24139597
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
    @@ -81,117 +123,189 @@ private[parquet] case class Partition(partitionValues: Map[String, Any], files:
      * discovery.
      */
     @DeveloperApi
    -case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
    -  extends CatalystScan with Logging {
    +case class ParquetRelation2
    +    (paths: Seq[String], parameters: Map[String, String], maybeSchema: Option[StructType] = None)
    +    (@transient val sqlContext: SQLContext)
    +  extends CatalystScan
    +  with InsertableRelation
    +  with SparkHadoopMapReduceUtil
    +  with Logging {
    +
    +  // Should we merge schemas from all Parquet part-files?
    +  private val shouldMergeSchemas =
    +    parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
    --- End diff --
    
    oh sorry, it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SQL] WIP: Parquet d...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r23921444
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
    @@ -227,66 +294,302 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
             val cacheMetadata = useCache
     
             @transient
    -        val cachedStatus = selectedPartitions.flatMap(_.files)
    +        val cachedStatus = selectedFiles
     
             // Overridden so we can inject our own cached files statuses.
             override def getPartitions: Array[SparkPartition] = {
    -          val inputFormat =
    -            if (cacheMetadata) {
    -              new FilteringParquetRowInputFormat {
    -                override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
    -              }
    -            } else {
    -              new FilteringParquetRowInputFormat
    +          val inputFormat = if (cacheMetadata) {
    +            new FilteringParquetRowInputFormat {
    +              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
                 }
    -
    -          inputFormat match {
    -            case configurable: Configurable =>
    -              configurable.setConf(getConf)
    -            case _ =>
    +          } else {
    +            new FilteringParquetRowInputFormat
               }
    +
               val jobContext = newJobContext(getConf, jobId)
    -          val rawSplits = inputFormat.getSplits(jobContext).toArray
    -          val result = new Array[SparkPartition](rawSplits.size)
    -          for (i <- 0 until rawSplits.size) {
    -            result(i) =
    -              new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
    +          val rawSplits = inputFormat.getSplits(jobContext)
    +
    +          Array.tabulate[SparkPartition](rawSplits.size) { i =>
    +            new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
               }
    -          result
             }
           }
     
    -    // The ordinal for the partition key in the result row, if requested.
    -    val partitionKeyLocation =
    -      partitionKeys
    -        .headOption
    -        .map(requiredColumns.indexOf(_))
    -        .getOrElse(-1)
    +    // The ordinals for partition keys in the result row, if requested.
    +    val partitionKeyLocations = partitionColumns.fieldNames.zipWithIndex.map {
    +      case (name, index) => index -> requiredColumns.indexOf(name)
    +    }.toMap.filter {
    +      case (_, index) => index >= 0
    +    }
     
         // When the data does not include the key and the key is requested then we must fill it in
         // based on information from the input split.
    -    if (!dataIncludesKey && partitionKeyLocation != -1) {
    -      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
    -        val partValue = "([^=]+)=([^=]+)".r
    -        val partValues =
    -          split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
    -            .getPath
    -            .toString
    -            .split("/")
    -            .flatMap {
    -            case partValue(key, value) => Some(key -> value)
    -            case _ => None
    -          }.toMap
    -
    -        val currentValue = partValues.values.head.toInt
    -        iter.map { pair =>
    -          val res = pair._2.asInstanceOf[SpecificMutableRow]
    -          res.setInt(partitionKeyLocation, currentValue)
    -          res
    +    if (!dataSchemaIncludesPartitionKeys && partitionKeyLocations.nonEmpty) {
    +      baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
    +        val partValues = selectedPartitions.collectFirst {
    +          case p if split.getPath.getParent.toString == p.path => p.values
    +        }.get
    +
    +        iterator.map { pair =>
    +          val row = pair._2.asInstanceOf[SpecificMutableRow]
    +          var i = 0
    +          while (i < partValues.size) {
    +            // TODO Avoids boxing cost here!
    +            row.update(partitionKeyLocations(i), partValues(i))
    +            i += 1
    +          }
    +          row
             }
           }
         } else {
           baseRDD.map(_._2)
         }
       }
    +
    +  private def prunePartitions(
    +      predicates: Seq[Expression],
    +      partitions: Seq[Partition]): Seq[Partition] = {
    +    val partitionColumnNames = partitionColumns.map(_.name).toSet
    +    val partitionPruningPredicates = predicates.filter {
    +      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
    +    }
    +
    +    val rawPredicate = partitionPruningPredicates.reduceOption(And).getOrElse(Literal(true))
    +    val boundPredicate = InterpretedPredicate(rawPredicate transform {
    +      case a: AttributeReference =>
    +        val index = partitionColumns.indexWhere(a.name == _.name)
    +        BoundReference(index, partitionColumns(index).dataType, nullable = true)
    +    })
    +
    +    if (isPartitioned && partitionPruningPredicates.nonEmpty) {
    +      partitions.filter(p => boundPredicate(p.values))
    +    } else {
    +      partitions
    +    }
    +  }
    +}
    +
    +object ParquetRelation2 {
    +  // Whether we should merge schemas collected from all Parquet part-files.
    +  val MERGE_SCHEMA = "parquet.mergeSchema"
    +
    +  // Hive Metastore schema, passed in when the Parquet relation is converted from Metastore
    +  val METASTORE_SCHEMA = "parquet.metastoreSchema"
    +
    +  // Default partition name to use when the partition column value is null or empty string
    +  val DEFAULT_PARTITION_NAME = "partition.defaultName"
    +
    +  private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
    +    footers.map { footer =>
    +      val metadata = footer.getParquetMetadata.getFileMetaData
    +      val parquetSchema = metadata.getSchema
    +      val maybeSparkSchema = metadata
    +        .getKeyValueMetaData
    +        .toMap
    +        .get(RowReadSupport.SPARK_METADATA_KEY)
    +        .map(DataType.fromJson(_).asInstanceOf[StructType])
    +
    +      maybeSparkSchema.getOrElse {
    +        // Falls back to Parquet schema if Spark SQL schema is absent.
    +        StructType.fromAttributes(
    +          // TODO Really no need to use `Attribute` here, we only need to know the data type.
    +          convertToAttributes(parquetSchema, sqlContext.conf.isParquetBinaryAsString))
    +      }
    +    }.reduce { (left, right) =>
    +      try mergeCatalystSchemas(left, right) catch { case e: Throwable =>
    +        throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
    +      }
    +    }
    +  }
    +
    +  private[parquet] def mergeMetastoreParquetSchema(
    +      metastoreSchema: StructType,
    +      parquetSchema: StructType): StructType = {
    +    def schemaConflictMessage =
    +      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
    +         |${metastoreSchema.prettyJson}
    +         |
    +         |Parquet schema:
    +         |${parquetSchema.prettyJson}
    +       """.stripMargin
    +
    +    assert(metastoreSchema.size == parquetSchema.size, schemaConflictMessage)
    +
    +    val ordinalMap = metastoreSchema.zipWithIndex.map {
    +      case (field, index) => field.name.toLowerCase -> index
    +    }.toMap
    +    val reorderedParquetSchema = parquetSchema.sortBy(f => ordinalMap(f.name.toLowerCase))
    +
    +    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
    +      // Uses Parquet field names but retains Metastore data types.
    +      case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
    +        mSchema.copy(name = pSchema.name)
    +      case _ =>
    +        throw new SparkException(schemaConflictMessage)
    +    })
    +  }
    +
    +  // TODO Data source implementations shouldn't touch Catalyst types (`Literal`).
    +  // However, we are already using Catalyst expressions for partition pruning and predicate
    +  // push-down here...
    +  private[parquet] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
    +    require(columnNames.size == literals.size)
    +  }
    +
    +  /**
    +   * Given a group of qualified paths, tries to parse them and returns a partition specification.
    +   * For example, given:
    +   * {{{
    +   *   hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
    +   *   hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
    +   * }}}
    +   * it returns:
    +   * {{{
    +   *   PartitionSpec(
    +   *     partitionColumns = StructType(
    +   *       StructField(name = "a", dataType = IntegerType, nullable = true),
    +   *       StructField(name = "b", dataType = StringType, nullable = true),
    +   *       StructField(name = "c", dataType = DoubleType, nullable = true)),
    +   *     partitions = Seq(
    +   *       Partition(
    +   *         values = Row(1, "hello", 3.14),
    +   *         path = "hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
    +   *       Partition(
    +   *         values = Row(2, "world", 6.28),
    +   *         path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
    +   * }}}
    +   */
    +  private[parquet] def parsePartitions(
    +      paths: Seq[Path],
    +      defaultPartitionName: String): PartitionSpec = {
    +    val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName)))
    +    val fields = {
    +      val (PartitionValues(columnNames, literals)) = partitionValues.head
    +      columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
    +        StructField(name, dataType, nullable = true)
    +      }
    +    }
    +
    +    val partitions = partitionValues.zip(paths).map {
    +      case (PartitionValues(_, literals), path) =>
    +        Partition(Row(literals.map(_.value): _*), path.toString)
    +    }
    +
    +    PartitionSpec(StructType(fields), partitions)
    +  }
    +
    +  /**
    +   * Parses a single partition, returns column names and values of each partition column.  For
    +   * example, given:
    +   * {{{
    +   *   basePath = hdfs://<host>:<port>/path/to/partition
    +   *   partitionPath = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
    --- End diff --
    
    Fix outdated comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72553615
  
      [Test build #26537 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26537/consoleFull) for   PR 4308 at commit [`0277e47`](https://github.com/apache/spark/commit/0277e473349c318fe765be41db5e860963587a5c).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class DefaultSource extends RelationProvider with SchemaRelationProvider `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72571757
  
      [Test build #26562 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26562/consoleFull) for   PR 4308 at commit [`87689d5`](https://github.com/apache/spark/commit/87689d54e082d4cda2743efc4aeb11df2336bdec).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SQL] WIP: Parquet d...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r23921277
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
    @@ -81,116 +101,158 @@ private[parquet] case class Partition(partitionValues: Map[String, Any], files:
      * discovery.
      */
     @DeveloperApi
    -case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
    +case class ParquetRelation2
    +    (paths: Seq[String], parameters: Map[String, String], maybeSchema: Option[StructType] = None)
    +    (@transient val sqlContext: SQLContext)
       extends CatalystScan with Logging {
     
    +  // Should we merge schemas from all Parquet part-files?
    +  private val shouldMergeSchemas =
    +    parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
    +
       def sparkContext = sqlContext.sparkContext
     
    -  // Minor Hack: scala doesnt seem to respect @transient for vals declared via extraction
    -  @transient
    -  private var partitionKeys: Seq[String] = _
    -  @transient
    -  private var partitions: Seq[Partition] = _
    -  discoverPartitions()
    +  private val fs = FileSystem.get(sparkContext.hadoopConfiguration)
     
    -  // TODO: Only finds the first partition, assumes the key is of type Integer...
    -  private def discoverPartitions() = {
    -    val fs = FileSystem.get(new java.net.URI(path), sparkContext.hadoopConfiguration)
    -    val partValue = "([^=]+)=([^=]+)".r
    +  private val baseStatuses = {
    +    val statuses = paths.distinct.map(p => fs.getFileStatus(fs.makeQualified(new Path(p))))
    +    assert(statuses.forall(_.isFile) || statuses.forall(_.isDir))
    +    statuses
    +  }
     
    -    val childrenOfPath = fs.listStatus(new Path(path)).filterNot(_.getPath.getName.startsWith("_"))
    -    val childDirs = childrenOfPath.filter(s => s.isDir)
    +  private val leafStatuses = baseStatuses.flatMap { f =>
    +    val statuses = SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
    +      isSummaryFile(f.getPath) ||
    +        !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
    +    }
    +    assert(statuses.nonEmpty, s"${f.getPath} is an empty folder.")
    +    statuses
    +  }
     
    -    if (childDirs.size > 0) {
    -      val partitionPairs = childDirs.map(_.getPath.getName).map {
    -        case partValue(key, value) => (key, value)
    -      }
    +  private val (dataStatuses, metadataStatuses, commonMetadataStatuses) = {
    +    (leafStatuses.filterNot(f => isSummaryFile(f.getPath)).toSeq,
    +      leafStatuses.filter(f => f.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE),
    +      leafStatuses.filter(f => f.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))
    +  }
     
    -      val foundKeys = partitionPairs.map(_._1).distinct
    -      if (foundKeys.size > 1) {
    -        sys.error(s"Too many distinct partition keys: $foundKeys")
    -      }
    +  private val footers = {
    +    // TODO Issue a Spark job to gather footers if there are too many files
    +    (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
    +      val parquetMetadata = ParquetFileReader.readFooter(
    +        sparkContext.hadoopConfiguration, f, ParquetMetadataConverter.NO_FILTER)
    +      f -> new Footer(f.getPath, parquetMetadata)
    +    }.seq.toMap
    +  }
     
    -      // Do a parallel lookup of partition metadata.
    -      val partitionFiles =
    -        childDirs.par.map { d =>
    -          fs.listStatus(d.getPath)
    -            // TODO: Is there a standard hadoop function for this?
    -            .filterNot(_.getPath.getName.startsWith("_"))
    -            .filterNot(_.getPath.getName.startsWith("."))
    -        }.seq
    -
    -      partitionKeys = foundKeys.toSeq
    -      partitions = partitionFiles.zip(partitionPairs).map { case (files, (key, value)) =>
    -        Partition(Map(key -> value.toInt), files)
    -      }.toSeq
    +  private val partitionSpec = {
    +    val partitionDirs =
    +      dataStatuses
    +        .filterNot(baseStatuses.contains)
    +        .map(_.getPath.getParent)
    +        .distinct
    +
    +    // Hive uses this as part of the default partition name when the partition column value is null
    +    // or empty string
    +    val defaultPartitionName = parameters.getOrElse(
    +      ParquetRelation2.DEFAULT_PARTITION_NAME,
    +      "__HIVE_DEFAULT_PARTITION__")
    +
    +    if (partitionDirs.nonEmpty) {
    +      ParquetRelation2.parsePartitions(partitionDirs, defaultPartitionName)
         } else {
    -      partitionKeys = Nil
    -      partitions = Partition(Map.empty, childrenOfPath) :: Nil
    +      // No partition directories found, makes an empty specification
    +      PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition])
         }
       }
     
    -  override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum
    +  private val PartitionSpec(partitionColumns, partitions) = partitionSpec
    +
    +  private def isPartitioned = partitionColumns.nonEmpty
     
    -  val dataSchema = StructType.fromAttributes( // TODO: Parquet code should not deal with attributes.
    -    ParquetTypesConverter.readSchemaFromFile(
    -      partitions.head.files.head.getPath,
    -      Some(sparkContext.hadoopConfiguration),
    -      sqlContext.conf.isParquetBinaryAsString))
    +  private val dataSchema = maybeSchema.getOrElse(readSchema())
     
    -  val dataIncludesKey =
    -    partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true)
    +  private val dataSchemaIncludesPartitionKeys =
    +    isPartitioned && partitionColumns.forall(f => dataSchema.fieldNames.contains(f.name))
     
    -  override val schema =
    -    if (dataIncludesKey) {
    +  override val schema = {
    +    val fullParquetSchema = if (dataSchemaIncludesPartitionKeys) {
           dataSchema
         } else {
    -      StructType(dataSchema.fields :+ StructField(partitionKeys.head, IntegerType))
    +      StructType(dataSchema.fields ++ partitionColumns.fields)
         }
     
    -  override def buildScan(output: Seq[Attribute], predicates: Seq[Expression]): RDD[Row] = {
    -    // This is mostly a hack so that we can use the existing parquet filter code.
    -    val requiredColumns = output.map(_.name)
    +    val maybeMetastoreSchema =
    +      parameters
    +        .get(ParquetRelation2.METASTORE_SCHEMA)
    +        .map(s => DataType.fromJson(s).asInstanceOf[StructType])
     
    -    val job = new Job(sparkContext.hadoopConfiguration)
    -    ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
    -    val jobConf: Configuration = ContextUtil.getConfiguration(job)
    +    maybeMetastoreSchema
    +      .map(ParquetRelation2.mergeMetastoreParquetSchema(_, fullParquetSchema))
    +      .getOrElse(fullParquetSchema)
    +  }
     
    -    val requestedSchema = StructType(requiredColumns.map(schema(_)))
    +  private def readSchema(): StructType = {
    +    // Sees which file(s) we need to touch in order to figure out the schema.
    +    val filesToTouch =
    +      // Always tries the summary files first if users don't require a merged schema.  In this case,
    +      // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
    +      // groups information, and could be much smaller for large Parquet files with lots of row
    +      // groups.
    +      //
    +      // NOTE: Metadata stored in the summary files are merged from all part-files.  However, for
    +      // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know
    +      // how to merge them correctly if some key is associated with different values in different
    +      // part-files.  When this happens, Parquet simply gives up generating the summary file.  This
    +      // implies that if a summary file presents, then:
    +      //
    +      //   1. Either all part-files have exactly the same Spark SQL schema, or
    +      //   2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus
    +      //      their schemas may differ from each other).
    +      //
    +      // Here we tend to be pessimistic and take the second case into account.  Basically this means
    +      // we can't trust the summary files if users require a merged schema, and must touch all part-
    +      // files to do the merge.
    +      if (shouldMergeSchemas) {
    +        dataStatuses.toSeq
    +      } else {
    +        commonMetadataStatuses.headOption
    +          .orElse(metadataStatuses.headOption)
    +          // Summary file(s) not found, falls back to the first part-file.
    +          .orElse(dataStatuses.headOption)
    +          .toSeq
    +      }
     
    -    val partitionKeySet = partitionKeys.toSet
    -    val rawPredicate =
    -      predicates
    -        .filter(_.references.map(_.name).toSet.subsetOf(partitionKeySet))
    -        .reduceOption(And)
    -        .getOrElse(Literal(true))
    +    ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
    +  }
     
    -    // Translate the predicate so that it reads from the information derived from the
    -    // folder structure
    -    val castedPredicate = rawPredicate transform {
    -      case a: AttributeReference =>
    -        val idx = partitionKeys.indexWhere(a.name == _)
    -        BoundReference(idx, IntegerType, nullable = true)
    -    }
    +  private def isSummaryFile(file: Path): Boolean = {
    +    file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
    +      file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
    +  }
     
    -    val inputData = new GenericMutableRow(partitionKeys.size)
    -    val pruningCondition = InterpretedPredicate(castedPredicate)
    +  // TODO Should calculate per scan size
    +  // It's common that a query only scans a fraction of a large Parquet file.  Returning size of the
    +  // whole Parquet file disables some optimizations in this case (e.g. broadcast join).
    +  override val sizeInBytes = dataStatuses.map(_.getLen).sum
     
    -    val selectedPartitions =
    -      if (partitionKeys.nonEmpty && predicates.nonEmpty) {
    -        partitions.filter { part =>
    -          inputData(0) = part.partitionValues.values.head
    -          pruningCondition(inputData)
    -        }
    -      } else {
    -        partitions
    +  // This is mostly a hack so that we can use the existing parquet filter code.
    --- End diff --
    
    Should stop using Catalyst expressions and remove `CatalystScan` after lifting partition discovery and partition pruning to data source API level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72583721
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26572/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r23983767
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala ---
    @@ -175,7 +175,10 @@ case class EqualTo(left: Expression, right: Expression) extends BinaryComparison
           null
         } else {
           val r = right.eval(input)
    -      if (r == null) null else l == r
    +      if (r == null) null
    +      else if (left.dataType != BinaryType) l == r
    +      else BinaryType.ordering.compare(
    +        l.asInstanceOf[Array[Byte]], r.asInstanceOf[Array[Byte]]) == 0
    --- End diff --
    
    btw this is really expensive. i'd use sth like this: http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/primitives/UnsignedBytes.html
    
    If you don't want to change it as part of this PR, file a jira ticket to track it.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72588587
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26595/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72537898
  
      [Test build #26537 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26537/consoleFull) for   PR 4308 at commit [`0277e47`](https://github.com/apache/spark/commit/0277e473349c318fe765be41db5e860963587a5c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r24139496
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
    @@ -263,8 +263,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
        *
        * @group userf
        */
    -  def parquetFile(path: String): DataFrame =
    -    DataFrame(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
    +  @scala.annotation.varargs
    +  def parquetFile(path: String, paths: String*): DataFrame =
    --- End diff --
    
    why not just `path: String*`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/4308


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72588583
  
      [Test build #26595 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26595/consoleFull) for   PR 4308 at commit [`1b11851`](https://github.com/apache/spark/commit/1b11851d30fa0d6e8851475c1fa8e525b69d3c43).
     * This patch **fails to build**.
     * This patch **does not merge cleanly**.
     * This patch adds the following public classes _(experimental)_:
      * `class KafkaUtils(object):`
      * `trait Column extends DataFrame with ExpressionApi `
      * `class ColumnName(name: String) extends IncomputableColumn(name) `
      * `trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] `
      * `class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expression])`
      * `  protected[sql] class QueryExecution(val logical: LogicalPlan) `
      * `class DefaultSource extends RelationProvider with SchemaRelationProvider `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72734625
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26667/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72952037
  
      [Test build #26780 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26780/consoleFull) for   PR 4308 at commit [`ae1ee78`](https://github.com/apache/spark/commit/ae1ee783fd46e867f0ffd17161ad1f37292ebad6).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SQL] WIP: Parquet d...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r23921130
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala ---
    @@ -89,7 +89,7 @@ trait ParquetTest {
           (data: Seq[T])
           (f: String => Unit): Unit = {
         withTempPath { file =>
    -      sparkContext.parallelize(data).saveAsParquetFile(file.getCanonicalPath)
    +      sparkContext.parallelize(data, 3).saveAsParquetFile(file.getCanonicalPath)
    --- End diff --
    
    Oops, debugging code, should revert this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72951593
  
    retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r24140664
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
    @@ -262,8 +262,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
        *
        * @group userf
        */
    -  def parquetFile(path: String): DataFrame =
    -    DataFrame(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
    +  @scala.annotation.varargs
    +  def parquetFile(paths: String*): DataFrame =
    --- End diff --
    
    i'll add that if we ever do overload this we can do this disambiguation then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72553624
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26537/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72720550
  
      [Test build #26667 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26667/consoleFull) for   PR 4308 at commit [`5584e24`](https://github.com/apache/spark/commit/5584e240419953db488e832d969ede8f7452c995).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r24139573
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
    @@ -81,117 +123,189 @@ private[parquet] case class Partition(partitionValues: Map[String, Any], files:
      * discovery.
      */
     @DeveloperApi
    -case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
    -  extends CatalystScan with Logging {
    +case class ParquetRelation2
    +    (paths: Seq[String], parameters: Map[String, String], maybeSchema: Option[StructType] = None)
    +    (@transient val sqlContext: SQLContext)
    +  extends CatalystScan
    +  with InsertableRelation
    +  with SparkHadoopMapReduceUtil
    +  with Logging {
    +
    +  // Should we merge schemas from all Parquet part-files?
    +  private val shouldMergeSchemas =
    +    parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
    --- End diff --
    
    I think this should be a datasource option instead of a global configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r24140578
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
    @@ -263,8 +263,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
        *
        * @group userf
        */
    -  def parquetFile(path: String): DataFrame =
    -    DataFrame(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
    +  @scala.annotation.varargs
    +  def parquetFile(path: String, paths: String*): DataFrame =
    --- End diff --
    
    This is per @rxin's [this comment] [1], which I think makes sense.
    
    [1]: https://github.com/apache/spark/pull/4308#discussion_r23983783


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72592529
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26591/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-73150562
  
      [Test build #26858 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26858/consoleFull) for   PR 4308 at commit [`b6946e6`](https://github.com/apache/spark/commit/b6946e67c0bbf30534aa6ebfd6b5926400529a09).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]`
      * `trait CreatableRelationProvider `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72966421
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26780/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r23983783
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
    @@ -262,8 +262,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
        *
        * @group userf
        */
    -  def parquetFile(path: String): DataFrame =
    -    DataFrame(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
    +  @scala.annotation.varargs
    +  def parquetFile(paths: String*): DataFrame =
    --- End diff --
    
    as commented on the other pr, use
    ```scala
    def parquetFile(path: String, paths: String*): DataFrame
    ```
    
    to make sure this is not ambiguous if we overload the function with another varargs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72583717
  
      [Test build #26572 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26572/consoleFull) for   PR 4308 at commit [`170a0f8`](https://github.com/apache/spark/commit/170a0f8619a0a0eed26ccef71815c31abd9f89ac).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class DefaultSource extends RelationProvider with SchemaRelationProvider `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72580683
  
      [Test build #26562 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26562/consoleFull) for   PR 4308 at commit [`87689d5`](https://github.com/apache/spark/commit/87689d54e082d4cda2743efc4aeb11df2336bdec).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class DefaultSource extends RelationProvider with SchemaRelationProvider `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r23983233
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala ---
    @@ -490,4 +489,55 @@ private[parquet] object ParquetTypesConverter extends Logging {
           attributes
         }
       }
    +
    +  def mergeCatalystSchemas(left: StructType, right: StructType): StructType =
    +    mergeCatalystDataTypes(left, right).asInstanceOf[StructType]
    +
    +  def mergeCatalystDataTypes(left: DataType, right: DataType): DataType =
    --- End diff --
    
    would be great to add more comment explaining what's going on


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72588320
  
      [Test build #26595 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26595/consoleFull) for   PR 4308 at commit [`1b11851`](https://github.com/apache/spark/commit/1b11851d30fa0d6e8851475c1fa8e525b69d3c43).
     * This patch **does not merge cleanly**.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72595205
  
      [Test build #26601 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26601/consoleFull) for   PR 4308 at commit [`bcb3ad6`](https://github.com/apache/spark/commit/bcb3ad688d57f84725665a1204eb9e1dff340033).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class KafkaUtils(object):`
      * `public class JDBCUtils `
      * `trait Column extends DataFrame with ExpressionApi `
      * `class ColumnName(name: String) extends IncomputableColumn(name) `
      * `trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] `
      * `class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expression])`
      * `  protected[sql] class QueryExecution(val logical: LogicalPlan) `
      * `          logWarning(s"Couldn't find class $driver", e);`
      * `  implicit class JDBCDataFrame(rdd: DataFrame) `
      * `class DefaultSource extends RelationProvider with SchemaRelationProvider `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72966411
  
      [Test build #26780 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26780/consoleFull) for   PR 4308 at commit [`ae1ee78`](https://github.com/apache/spark/commit/ae1ee783fd46e867f0ffd17161ad1f37292ebad6).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]`
      * `trait CreatableRelationProvider `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72580690
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26562/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SQL] WIP: Parquet d...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72460484
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26514/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-73117929
  
      [Test build #26856 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26856/consoleFull) for   PR 4308 at commit [`1ad361e`](https://github.com/apache/spark/commit/1ad361e3ef57bd696859ef18d46fa419b3761986).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-73133259
  
      [Test build #26858 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26858/consoleFull) for   PR 4308 at commit [`b6946e6`](https://github.com/apache/spark/commit/b6946e67c0bbf30534aa6ebfd6b5926400529a09).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72806615
  
      [Test build #26734 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26734/consoleFull) for   PR 4308 at commit [`ae1ee78`](https://github.com/apache/spark/commit/ae1ee783fd46e867f0ffd17161ad1f37292ebad6).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72922034
  
      [Test build #26769 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26769/consoleFull) for   PR 4308 at commit [`ae1ee78`](https://github.com/apache/spark/commit/ae1ee783fd46e867f0ffd17161ad1f37292ebad6).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72592525
  
      [Test build #26591 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26591/consoleFull) for   PR 4308 at commit [`a760555`](https://github.com/apache/spark/commit/a76055576ba67dc69441a1c8086a8dfe52add9c3).
     * This patch **fails PySpark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds the following public classes _(experimental)_:
      * `class DefaultSource extends RelationProvider with SchemaRelationProvider `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-73117116
  
    Rebased (for the 8th time during the last 72 hours), should be ready to go once Jenkins nods. Will address comments in follow-up PRs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by nugend <gi...@git.apache.org>.
Github user nugend commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r25485107
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
    @@ -228,66 +355,400 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
             val cacheMetadata = useCache
     
             @transient
    -        val cachedStatus = selectedPartitions.flatMap(_.files)
    +        val cachedStatus = selectedFiles
     
             // Overridden so we can inject our own cached files statuses.
             override def getPartitions: Array[SparkPartition] = {
    -          val inputFormat =
    -            if (cacheMetadata) {
    -              new FilteringParquetRowInputFormat {
    -                override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
    -              }
    -            } else {
    -              new FilteringParquetRowInputFormat
    +          val inputFormat = if (cacheMetadata) {
    +            new FilteringParquetRowInputFormat {
    +              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
                 }
    -
    -          inputFormat match {
    -            case configurable: Configurable =>
    -              configurable.setConf(getConf)
    -            case _ =>
    +          } else {
    +            new FilteringParquetRowInputFormat
               }
    +
               val jobContext = newJobContext(getConf, jobId)
    -          val rawSplits = inputFormat.getSplits(jobContext).toArray
    -          val result = new Array[SparkPartition](rawSplits.size)
    -          for (i <- 0 until rawSplits.size) {
    -            result(i) =
    -              new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
    +          val rawSplits = inputFormat.getSplits(jobContext)
    +
    +          Array.tabulate[SparkPartition](rawSplits.size) { i =>
    +            new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
               }
    -          result
             }
           }
     
    -    // The ordinal for the partition key in the result row, if requested.
    -    val partitionKeyLocation =
    -      partitionKeys
    -        .headOption
    -        .map(requiredColumns.indexOf(_))
    -        .getOrElse(-1)
    +    // The ordinals for partition keys in the result row, if requested.
    +    val partitionKeyLocations = partitionColumns.fieldNames.zipWithIndex.map {
    +      case (name, index) => index -> requiredColumns.indexOf(name)
    +    }.toMap.filter {
    +      case (_, index) => index >= 0
    +    }
     
         // When the data does not include the key and the key is requested then we must fill it in
         // based on information from the input split.
    -    if (!dataIncludesKey && partitionKeyLocation != -1) {
    -      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
    -        val partValue = "([^=]+)=([^=]+)".r
    -        val partValues =
    -          split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
    -            .getPath
    -            .toString
    -            .split("/")
    -            .flatMap {
    -            case partValue(key, value) => Some(key -> value)
    -            case _ => None
    -          }.toMap
    -
    -        val currentValue = partValues.values.head.toInt
    -        iter.map { pair =>
    -          val res = pair._2.asInstanceOf[SpecificMutableRow]
    -          res.setInt(partitionKeyLocation, currentValue)
    -          res
    +    if (!dataSchemaIncludesPartitionKeys && partitionKeyLocations.nonEmpty) {
    +      baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
    +        val partValues = selectedPartitions.collectFirst {
    +          case p if split.getPath.getParent.toString == p.path => p.values
    +        }.get
    +
    +        iterator.map { pair =>
    +          val row = pair._2.asInstanceOf[SpecificMutableRow]
    +          var i = 0
    +          while (i < partValues.size) {
    +            // TODO Avoids boxing cost here!
    +            row.update(partitionKeyLocations(i), partValues(i))
    +            i += 1
    +          }
    +          row
             }
           }
         } else {
           baseRDD.map(_._2)
         }
       }
    +
    +  private def prunePartitions(
    +      predicates: Seq[Expression],
    +      partitions: Seq[Partition]): Seq[Partition] = {
    +    val partitionColumnNames = partitionColumns.map(_.name).toSet
    +    val partitionPruningPredicates = predicates.filter {
    +      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
    +    }
    +
    +    val rawPredicate = partitionPruningPredicates.reduceOption(And).getOrElse(Literal(true))
    +    val boundPredicate = InterpretedPredicate(rawPredicate transform {
    +      case a: AttributeReference =>
    +        val index = partitionColumns.indexWhere(a.name == _.name)
    +        BoundReference(index, partitionColumns(index).dataType, nullable = true)
    +    })
    +
    +    if (isPartitioned && partitionPruningPredicates.nonEmpty) {
    +      partitions.filter(p => boundPredicate(p.values))
    +    } else {
    +      partitions
    +    }
    +  }
    +
    +  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
    +    // TODO: currently we do not check whether the "schema"s are compatible
    +    // That means if one first creates a table and then INSERTs data with
    +    // and incompatible schema the execution will fail. It would be nice
    +    // to catch this early one, maybe having the planner validate the schema
    +    // before calling execute().
    +
    +    val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
    +    val writeSupport = if (schema.map(_.dataType).forall(_.isPrimitive)) {
    +      log.debug("Initializing MutableRowWriteSupport")
    +      classOf[MutableRowWriteSupport]
    +    } else {
    +      classOf[RowWriteSupport]
    +    }
    +
    +    ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
    +
    +    val conf = ContextUtil.getConfiguration(job)
    +    RowWriteSupport.setSchema(schema.toAttributes, conf)
    +
    +    val destinationPath = new Path(paths.head)
    +
    +    if (overwrite) {
    +      try {
    +        destinationPath.getFileSystem(conf).delete(destinationPath, true)
    +      } catch {
    +        case e: IOException =>
    +          throw new IOException(
    +            s"Unable to clear output directory ${destinationPath.toString} prior" +
    +              s" to writing to Parquet file:\n${e.toString}")
    +      }
    +    }
    +
    +    job.setOutputKeyClass(classOf[Void])
    +    job.setOutputValueClass(classOf[Row])
    +    FileOutputFormat.setOutputPath(job, destinationPath)
    +
    +    val wrappedConf = new SerializableWritable(job.getConfiguration)
    +    val jobTrackerId = new SimpleDateFormat("yyyyMMddHHmm").format(new Date())
    +    val stageId = sqlContext.sparkContext.newRddId()
    +
    +    val taskIdOffset = if (overwrite) {
    +      1
    +    } else {
    +      FileSystemHelper.findMaxTaskId(
    +        FileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
    +    }
    +
    +    def writeShard(context: TaskContext, iterator: Iterator[Row]): Unit = {
    +      /* "reduce task" <split #> <attempt # = spark task #> */
    +      val attemptId = newTaskAttemptID(
    +        jobTrackerId, stageId, isMap = false, context.partitionId(), context.attemptNumber())
    +      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
    +      val format = new AppendingParquetOutputFormat(taskIdOffset)
    +      val committer = format.getOutputCommitter(hadoopContext)
    +      committer.setupTask(hadoopContext)
    +      val writer = format.getRecordWriter(hadoopContext)
    +      try {
    +        while (iterator.hasNext) {
    +          val row = iterator.next()
    +          writer.write(null, row)
    +        }
    +      } finally {
    +        writer.close(hadoopContext)
    +      }
    +      committer.commitTask(hadoopContext)
    +    }
    +    val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
    +    /* apparently we need a TaskAttemptID to construct an OutputCommitter;
    +     * however we're only going to use this local OutputCommitter for
    +     * setupJob/commitJob, so we just use a dummy "map" task.
    +     */
    +    val jobAttemptId = newTaskAttemptID(jobTrackerId, stageId, isMap = true, 0, 0)
    +    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
    +    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
    +
    +    jobCommitter.setupJob(jobTaskContext)
    +    sqlContext.sparkContext.runJob(data.queryExecution.executedPlan.execute(), writeShard _)
    +    jobCommitter.commitJob(jobTaskContext)
    +
    +    metadataCache.refresh()
    +  }
    +}
    +
    +object ParquetRelation2 {
    +  // Whether we should merge schemas collected from all Parquet part-files.
    +  val MERGE_SCHEMA = "mergeSchema"
    +
    +  // Hive Metastore schema, passed in when the Parquet relation is converted from Metastore
    +  val METASTORE_SCHEMA = "metastoreSchema"
    +
    +  // Default partition name to use when the partition column value is null or empty string
    +  val DEFAULT_PARTITION_NAME = "partition.defaultName"
    +
    +  // When true, the Parquet data source caches Parquet metadata for performance
    +  val CACHE_METADATA = "cacheMetadata"
    +
    +  private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
    +    footers.map { footer =>
    +      val metadata = footer.getParquetMetadata.getFileMetaData
    +      val parquetSchema = metadata.getSchema
    +      val maybeSparkSchema = metadata
    +        .getKeyValueMetaData
    +        .toMap
    +        .get(RowReadSupport.SPARK_METADATA_KEY)
    +        .map(DataType.fromJson(_).asInstanceOf[StructType])
    +
    +      maybeSparkSchema.getOrElse {
    +        // Falls back to Parquet schema if Spark SQL schema is absent.
    +        StructType.fromAttributes(
    +          // TODO Really no need to use `Attribute` here, we only need to know the data type.
    +          convertToAttributes(
    +            parquetSchema,
    +            sqlContext.conf.isParquetBinaryAsString,
    +            sqlContext.conf.isParquetINT96AsTimestamp))
    +      }
    +    }.reduce { (left, right) =>
    +      try left.merge(right) catch { case e: Throwable =>
    +        throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
    +      }
    +    }
    +  }
    +
    +  private[parquet] def mergeMetastoreParquetSchema(
    +      metastoreSchema: StructType,
    +      parquetSchema: StructType): StructType = {
    +    def schemaConflictMessage =
    +      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
    +         |${metastoreSchema.prettyJson}
    +         |
    +         |Parquet schema:
    +         |${parquetSchema.prettyJson}
    +       """.stripMargin
    +
    +    assert(metastoreSchema.size == parquetSchema.size, schemaConflictMessage)
    +
    +    val ordinalMap = metastoreSchema.zipWithIndex.map {
    +      case (field, index) => field.name.toLowerCase -> index
    +    }.toMap
    +    val reorderedParquetSchema = parquetSchema.sortBy(f => ordinalMap(f.name.toLowerCase))
    +
    +    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
    +      // Uses Parquet field names but retains Metastore data types.
    +      case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
    +        mSchema.copy(name = pSchema.name)
    +      case _ =>
    +        throw new SparkException(schemaConflictMessage)
    +    })
    +  }
    +
    +  // TODO Data source implementations shouldn't touch Catalyst types (`Literal`).
    +  // However, we are already using Catalyst expressions for partition pruning and predicate
    +  // push-down here...
    +  private[parquet] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
    +    require(columnNames.size == literals.size)
    +  }
    +
    +  /**
    +   * Given a group of qualified paths, tries to parse them and returns a partition specification.
    +   * For example, given:
    +   * {{{
    +   *   hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
    +   *   hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
    +   * }}}
    +   * it returns:
    +   * {{{
    +   *   PartitionSpec(
    +   *     partitionColumns = StructType(
    +   *       StructField(name = "a", dataType = IntegerType, nullable = true),
    +   *       StructField(name = "b", dataType = StringType, nullable = true),
    +   *       StructField(name = "c", dataType = DoubleType, nullable = true)),
    +   *     partitions = Seq(
    +   *       Partition(
    +   *         values = Row(1, "hello", 3.14),
    +   *         path = "hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
    +   *       Partition(
    +   *         values = Row(2, "world", 6.28),
    +   *         path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
    +   * }}}
    +   */
    +  private[parquet] def parsePartitions(
    +      paths: Seq[Path],
    +      defaultPartitionName: String): PartitionSpec = {
    +    val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName)))
    +    val fields = {
    +      val (PartitionValues(columnNames, literals)) = partitionValues.head
    +      columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
    +        StructField(name, dataType, nullable = true)
    +      }
    +    }
    +
    +    val partitions = partitionValues.zip(paths).map {
    +      case (PartitionValues(_, literals), path) =>
    +        Partition(Row(literals.map(_.value): _*), path.toString)
    +    }
    +
    +    PartitionSpec(StructType(fields), partitions)
    +  }
    +
    +  /**
    +   * Parses a single partition, returns column names and values of each partition column.  For
    +   * example, given:
    +   * {{{
    +   *   path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
    +   * }}}
    +   * it returns:
    +   * {{{
    +   *   PartitionValues(
    +   *     Seq("a", "b", "c"),
    +   *     Seq(
    +   *       Literal(42, IntegerType),
    +   *       Literal("hello", StringType),
    +   *       Literal(3.14, FloatType)))
    +   * }}}
    +   */
    +  private[parquet] def parsePartition(
    +      path: Path,
    +      defaultPartitionName: String): PartitionValues = {
    +    val columns = ArrayBuffer.empty[(String, Literal)]
    +    // Old Hadoop versions don't have `Path.isRoot`
    +    var finished = path.getParent == null
    +    var chopped = path
    +
    +    while (!finished) {
    +      val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName)
    +      maybeColumn.foreach(columns += _)
    +      chopped = chopped.getParent
    +      finished = maybeColumn.isEmpty || chopped.getParent == null
    +    }
    +
    +    val (columnNames, values) = columns.reverse.unzip
    +    PartitionValues(columnNames, values)
    +  }
    +
    +  private def parsePartitionColumn(
    +      columnSpec: String,
    +      defaultPartitionName: String): Option[(String, Literal)] = {
    +    val equalSignIndex = columnSpec.indexOf('=')
    +    if (equalSignIndex == -1) {
    +      None
    +    } else {
    +      val columnName = columnSpec.take(equalSignIndex)
    +      assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'")
    +
    +      val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
    +      assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")
    +
    +      val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName)
    +      Some(columnName -> literal)
    +    }
    +  }
    +
    +  /**
    +   * Resolves possible type conflicts between partitions by up-casting "lower" types.  The up-
    +   * casting order is:
    +   * {{{
    +   *   NullType ->
    +   *   IntegerType -> LongType ->
    +   *   FloatType -> DoubleType -> DecimalType.Unlimited ->
    +   *   StringType
    +   * }}}
    +   */
    +  private[parquet] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
    +    val distinctColNamesOfPartitions = values.map(_.columnNames).distinct
    +    val columnCount = values.head.columnNames.size
    +
    +    // Column names of all partitions must match
    +    assert(distinctColNamesOfPartitions.size == 1, {
    +      val list = distinctColNamesOfPartitions.mkString("\t", "\n", "")
    +      s"Conflicting partition column names detected:\n$list"
    +    })
    +
    +    // Resolves possible type conflicts for each column
    +    val resolvedValues = (0 until columnCount).map { i =>
    +      resolveTypeConflicts(values.map(_.literals(i)))
    +    }
    +
    +    // Fills resolved literals back to each partition
    +    values.zipWithIndex.map { case (d, index) =>
    +      d.copy(literals = resolvedValues.map(_(index)))
    +    }
    +  }
    +
    +  /**
    +   * Converts a string to a `Literal` with automatic type inference.  Currently only supports
    +   * [[IntegerType]], [[LongType]], [[FloatType]], [[DoubleType]], [[DecimalType.Unlimited]], and
    +   * [[StringType]].
    --- End diff --
    
    Would it be reasonable to support DateType and *then* StringType. In my experience, breaking data down by a date partition is pretty common and useful. My thinking is that if you see a string in the format YYYY-MM-DD (for example, I do recommend that format given its alphabetical sorting, personally, but it doesn't have to be that), then you can *probably* safely assume that the partition is intended to be a date.
    
    I'm not super familiar with this code though, so I'll have to defer to others' expertise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r24192566
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
    @@ -228,66 +347,397 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
             val cacheMetadata = useCache
     
             @transient
    -        val cachedStatus = selectedPartitions.flatMap(_.files)
    +        val cachedStatus = selectedFiles
     
             // Overridden so we can inject our own cached files statuses.
             override def getPartitions: Array[SparkPartition] = {
    -          val inputFormat =
    -            if (cacheMetadata) {
    -              new FilteringParquetRowInputFormat {
    -                override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
    -              }
    -            } else {
    -              new FilteringParquetRowInputFormat
    +          val inputFormat = if (cacheMetadata) {
    +            new FilteringParquetRowInputFormat {
    +              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
                 }
    -
    -          inputFormat match {
    -            case configurable: Configurable =>
    -              configurable.setConf(getConf)
    -            case _ =>
    +          } else {
    +            new FilteringParquetRowInputFormat
               }
    +
               val jobContext = newJobContext(getConf, jobId)
    -          val rawSplits = inputFormat.getSplits(jobContext).toArray
    -          val result = new Array[SparkPartition](rawSplits.size)
    -          for (i <- 0 until rawSplits.size) {
    -            result(i) =
    -              new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
    +          val rawSplits = inputFormat.getSplits(jobContext)
    +
    +          Array.tabulate[SparkPartition](rawSplits.size) { i =>
    +            new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
               }
    -          result
             }
           }
     
    -    // The ordinal for the partition key in the result row, if requested.
    -    val partitionKeyLocation =
    -      partitionKeys
    -        .headOption
    -        .map(requiredColumns.indexOf(_))
    -        .getOrElse(-1)
    +    // The ordinals for partition keys in the result row, if requested.
    +    val partitionKeyLocations = partitionColumns.fieldNames.zipWithIndex.map {
    +      case (name, index) => index -> requiredColumns.indexOf(name)
    +    }.toMap.filter {
    +      case (_, index) => index >= 0
    +    }
     
         // When the data does not include the key and the key is requested then we must fill it in
         // based on information from the input split.
    -    if (!dataIncludesKey && partitionKeyLocation != -1) {
    -      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
    -        val partValue = "([^=]+)=([^=]+)".r
    -        val partValues =
    -          split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
    -            .getPath
    -            .toString
    -            .split("/")
    -            .flatMap {
    -            case partValue(key, value) => Some(key -> value)
    -            case _ => None
    -          }.toMap
    -
    -        val currentValue = partValues.values.head.toInt
    -        iter.map { pair =>
    -          val res = pair._2.asInstanceOf[SpecificMutableRow]
    -          res.setInt(partitionKeyLocation, currentValue)
    -          res
    +    if (!dataSchemaIncludesPartitionKeys && partitionKeyLocations.nonEmpty) {
    +      baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
    +        val partValues = selectedPartitions.collectFirst {
    +          case p if split.getPath.getParent.toString == p.path => p.values
    +        }.get
    +
    +        iterator.map { pair =>
    +          val row = pair._2.asInstanceOf[SpecificMutableRow]
    +          var i = 0
    +          while (i < partValues.size) {
    +            // TODO Avoids boxing cost here!
    +            row.update(partitionKeyLocations(i), partValues(i))
    +            i += 1
    +          }
    +          row
             }
           }
         } else {
           baseRDD.map(_._2)
         }
       }
    +
    +  private def prunePartitions(
    +      predicates: Seq[Expression],
    +      partitions: Seq[Partition]): Seq[Partition] = {
    +    val partitionColumnNames = partitionColumns.map(_.name).toSet
    +    val partitionPruningPredicates = predicates.filter {
    +      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
    +    }
    +
    +    val rawPredicate = partitionPruningPredicates.reduceOption(And).getOrElse(Literal(true))
    +    val boundPredicate = InterpretedPredicate(rawPredicate transform {
    +      case a: AttributeReference =>
    +        val index = partitionColumns.indexWhere(a.name == _.name)
    +        BoundReference(index, partitionColumns(index).dataType, nullable = true)
    +    })
    +
    +    if (isPartitioned && partitionPruningPredicates.nonEmpty) {
    +      partitions.filter(p => boundPredicate(p.values))
    +    } else {
    +      partitions
    +    }
    +  }
    +
    +  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
    +    // TODO: currently we do not check whether the "schema"s are compatible
    +    // That means if one first creates a table and then INSERTs data with
    +    // and incompatible schema the execution will fail. It would be nice
    +    // to catch this early one, maybe having the planner validate the schema
    +    // before calling execute().
    +
    +    val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
    +    val writeSupport = if (schema.map(_.dataType).forall(_.isPrimitive)) {
    +      log.debug("Initializing MutableRowWriteSupport")
    +      classOf[MutableRowWriteSupport]
    +    } else {
    +      classOf[RowWriteSupport]
    +    }
    +
    +    ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
    +
    +    val conf = ContextUtil.getConfiguration(job)
    +    RowWriteSupport.setSchema(schema.toAttributes, conf)
    +
    +    val destinationPath = new Path(paths.head)
    +
    +    if (overwrite) {
    +      try {
    +        destinationPath.getFileSystem(conf).delete(destinationPath, true)
    +      } catch {
    +        case e: IOException =>
    +          throw new IOException(
    +            s"Unable to clear output directory ${destinationPath.toString} prior" +
    +              s" to writing to Parquet file:\n${e.toString}")
    +      }
    +    }
    +
    +    job.setOutputKeyClass(classOf[Void])
    +    job.setOutputValueClass(classOf[Row])
    +    FileOutputFormat.setOutputPath(job, destinationPath)
    +
    +    val wrappedConf = new SerializableWritable(job.getConfiguration)
    +    val jobTrackerId = new SimpleDateFormat("yyyyMMddHHmm").format(new Date())
    +    val stageId = sqlContext.sparkContext.newRddId()
    +
    +    val taskIdOffset = if (overwrite) {
    +      1
    +    } else {
    +      FileSystemHelper.findMaxTaskId(
    +        FileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
    +    }
    +
    +    def writeShard(context: TaskContext, iterator: Iterator[Row]): Unit = {
    +      /* "reduce task" <split #> <attempt # = spark task #> */
    +      val attemptId = newTaskAttemptID(
    +        jobTrackerId, stageId, isMap = false, context.partitionId(), context.attemptNumber())
    +      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
    +      val format = new AppendingParquetOutputFormat(taskIdOffset)
    +      val committer = format.getOutputCommitter(hadoopContext)
    +      committer.setupTask(hadoopContext)
    +      val writer = format.getRecordWriter(hadoopContext)
    +      try {
    +        while (iterator.hasNext) {
    +          val row = iterator.next()
    +          writer.write(null, row)
    +        }
    +      } finally {
    +        writer.close(hadoopContext)
    +      }
    +      committer.commitTask(hadoopContext)
    +    }
    +    val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
    +    /* apparently we need a TaskAttemptID to construct an OutputCommitter;
    +     * however we're only going to use this local OutputCommitter for
    +     * setupJob/commitJob, so we just use a dummy "map" task.
    +     */
    +    val jobAttemptId = newTaskAttemptID(jobTrackerId, stageId, isMap = true, 0, 0)
    +    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
    +    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
    +
    +    jobCommitter.setupJob(jobTaskContext)
    +    sqlContext.sparkContext.runJob(data.queryExecution.executedPlan.execute(), writeShard _)
    +    jobCommitter.commitJob(jobTaskContext)
    +
    +    metadataCache.refresh()
    +  }
    +}
    +
    +object ParquetRelation2 {
    +  // Whether we should merge schemas collected from all Parquet part-files.
    +  val MERGE_SCHEMA = "parquet.mergeSchema"
    --- End diff --
    
    Thanks. Will address these in follow-up PR(s).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72940617
  
      [Test build #26769 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26769/consoleFull) for   PR 4308 at commit [`ae1ee78`](https://github.com/apache/spark/commit/ae1ee783fd46e867f0ffd17161ad1f37292ebad6).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]`
      * `trait CreatableRelationProvider `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72822582
  
      [Test build #26734 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26734/consoleFull) for   PR 4308 at commit [`ae1ee78`](https://github.com/apache/spark/commit/ae1ee783fd46e867f0ffd17161ad1f37292ebad6).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SQL] WIP: Parquet d...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r23921434
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
    @@ -227,66 +294,302 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
             val cacheMetadata = useCache
     
             @transient
    -        val cachedStatus = selectedPartitions.flatMap(_.files)
    +        val cachedStatus = selectedFiles
     
             // Overridden so we can inject our own cached files statuses.
             override def getPartitions: Array[SparkPartition] = {
    -          val inputFormat =
    -            if (cacheMetadata) {
    -              new FilteringParquetRowInputFormat {
    -                override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
    -              }
    -            } else {
    -              new FilteringParquetRowInputFormat
    +          val inputFormat = if (cacheMetadata) {
    +            new FilteringParquetRowInputFormat {
    +              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
                 }
    -
    -          inputFormat match {
    -            case configurable: Configurable =>
    -              configurable.setConf(getConf)
    -            case _ =>
    +          } else {
    +            new FilteringParquetRowInputFormat
               }
    +
               val jobContext = newJobContext(getConf, jobId)
    -          val rawSplits = inputFormat.getSplits(jobContext).toArray
    -          val result = new Array[SparkPartition](rawSplits.size)
    -          for (i <- 0 until rawSplits.size) {
    -            result(i) =
    -              new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
    +          val rawSplits = inputFormat.getSplits(jobContext)
    +
    +          Array.tabulate[SparkPartition](rawSplits.size) { i =>
    +            new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
               }
    -          result
             }
           }
     
    -    // The ordinal for the partition key in the result row, if requested.
    -    val partitionKeyLocation =
    -      partitionKeys
    -        .headOption
    -        .map(requiredColumns.indexOf(_))
    -        .getOrElse(-1)
    +    // The ordinals for partition keys in the result row, if requested.
    +    val partitionKeyLocations = partitionColumns.fieldNames.zipWithIndex.map {
    +      case (name, index) => index -> requiredColumns.indexOf(name)
    +    }.toMap.filter {
    +      case (_, index) => index >= 0
    +    }
     
         // When the data does not include the key and the key is requested then we must fill it in
         // based on information from the input split.
    -    if (!dataIncludesKey && partitionKeyLocation != -1) {
    -      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
    -        val partValue = "([^=]+)=([^=]+)".r
    -        val partValues =
    -          split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
    -            .getPath
    -            .toString
    -            .split("/")
    -            .flatMap {
    -            case partValue(key, value) => Some(key -> value)
    -            case _ => None
    -          }.toMap
    -
    -        val currentValue = partValues.values.head.toInt
    -        iter.map { pair =>
    -          val res = pair._2.asInstanceOf[SpecificMutableRow]
    -          res.setInt(partitionKeyLocation, currentValue)
    -          res
    +    if (!dataSchemaIncludesPartitionKeys && partitionKeyLocations.nonEmpty) {
    +      baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
    +        val partValues = selectedPartitions.collectFirst {
    +          case p if split.getPath.getParent.toString == p.path => p.values
    +        }.get
    +
    +        iterator.map { pair =>
    +          val row = pair._2.asInstanceOf[SpecificMutableRow]
    +          var i = 0
    +          while (i < partValues.size) {
    +            // TODO Avoids boxing cost here!
    +            row.update(partitionKeyLocations(i), partValues(i))
    +            i += 1
    +          }
    +          row
             }
           }
         } else {
           baseRDD.map(_._2)
         }
       }
    +
    +  private def prunePartitions(
    +      predicates: Seq[Expression],
    +      partitions: Seq[Partition]): Seq[Partition] = {
    +    val partitionColumnNames = partitionColumns.map(_.name).toSet
    +    val partitionPruningPredicates = predicates.filter {
    +      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
    +    }
    +
    +    val rawPredicate = partitionPruningPredicates.reduceOption(And).getOrElse(Literal(true))
    +    val boundPredicate = InterpretedPredicate(rawPredicate transform {
    +      case a: AttributeReference =>
    +        val index = partitionColumns.indexWhere(a.name == _.name)
    +        BoundReference(index, partitionColumns(index).dataType, nullable = true)
    +    })
    +
    +    if (isPartitioned && partitionPruningPredicates.nonEmpty) {
    +      partitions.filter(p => boundPredicate(p.values))
    +    } else {
    +      partitions
    +    }
    +  }
    +}
    +
    +object ParquetRelation2 {
    +  // Whether we should merge schemas collected from all Parquet part-files.
    +  val MERGE_SCHEMA = "parquet.mergeSchema"
    +
    +  // Hive Metastore schema, passed in when the Parquet relation is converted from Metastore
    +  val METASTORE_SCHEMA = "parquet.metastoreSchema"
    +
    +  // Default partition name to use when the partition column value is null or empty string
    +  val DEFAULT_PARTITION_NAME = "partition.defaultName"
    +
    +  private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
    +    footers.map { footer =>
    +      val metadata = footer.getParquetMetadata.getFileMetaData
    +      val parquetSchema = metadata.getSchema
    +      val maybeSparkSchema = metadata
    +        .getKeyValueMetaData
    +        .toMap
    +        .get(RowReadSupport.SPARK_METADATA_KEY)
    +        .map(DataType.fromJson(_).asInstanceOf[StructType])
    +
    +      maybeSparkSchema.getOrElse {
    +        // Falls back to Parquet schema if Spark SQL schema is absent.
    +        StructType.fromAttributes(
    +          // TODO Really no need to use `Attribute` here, we only need to know the data type.
    +          convertToAttributes(parquetSchema, sqlContext.conf.isParquetBinaryAsString))
    +      }
    +    }.reduce { (left, right) =>
    +      try mergeCatalystSchemas(left, right) catch { case e: Throwable =>
    +        throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
    +      }
    +    }
    +  }
    +
    +  private[parquet] def mergeMetastoreParquetSchema(
    +      metastoreSchema: StructType,
    +      parquetSchema: StructType): StructType = {
    +    def schemaConflictMessage =
    +      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
    +         |${metastoreSchema.prettyJson}
    +         |
    +         |Parquet schema:
    +         |${parquetSchema.prettyJson}
    +       """.stripMargin
    +
    +    assert(metastoreSchema.size == parquetSchema.size, schemaConflictMessage)
    +
    +    val ordinalMap = metastoreSchema.zipWithIndex.map {
    +      case (field, index) => field.name.toLowerCase -> index
    +    }.toMap
    +    val reorderedParquetSchema = parquetSchema.sortBy(f => ordinalMap(f.name.toLowerCase))
    +
    +    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
    +      // Uses Parquet field names but retains Metastore data types.
    +      case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
    +        mSchema.copy(name = pSchema.name)
    +      case _ =>
    +        throw new SparkException(schemaConflictMessage)
    +    })
    +  }
    +
    +  // TODO Data source implementations shouldn't touch Catalyst types (`Literal`).
    +  // However, we are already using Catalyst expressions for partition pruning and predicate
    +  // push-down here...
    +  private[parquet] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
    +    require(columnNames.size == literals.size)
    +  }
    +
    +  /**
    +   * Given a group of qualified paths, tries to parse them and returns a partition specification.
    +   * For example, given:
    +   * {{{
    +   *   hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
    +   *   hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
    +   * }}}
    +   * it returns:
    +   * {{{
    +   *   PartitionSpec(
    +   *     partitionColumns = StructType(
    +   *       StructField(name = "a", dataType = IntegerType, nullable = true),
    +   *       StructField(name = "b", dataType = StringType, nullable = true),
    +   *       StructField(name = "c", dataType = DoubleType, nullable = true)),
    +   *     partitions = Seq(
    +   *       Partition(
    +   *         values = Row(1, "hello", 3.14),
    +   *         path = "hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
    +   *       Partition(
    +   *         values = Row(2, "world", 6.28),
    +   *         path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
    +   * }}}
    +   */
    +  private[parquet] def parsePartitions(
    +      paths: Seq[Path],
    +      defaultPartitionName: String): PartitionSpec = {
    +    val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName)))
    +    val fields = {
    +      val (PartitionValues(columnNames, literals)) = partitionValues.head
    +      columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
    +        StructField(name, dataType, nullable = true)
    +      }
    +    }
    +
    +    val partitions = partitionValues.zip(paths).map {
    +      case (PartitionValues(_, literals), path) =>
    +        Partition(Row(literals.map(_.value): _*), path.toString)
    +    }
    +
    +    PartitionSpec(StructType(fields), partitions)
    +  }
    +
    +  /**
    +   * Parses a single partition, returns column names and values of each partition column.  For
    +   * example, given:
    +   * {{{
    +   *   basePath = hdfs://<host>:<port>/path/to/partition
    +   *   partitionPath = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
    +   * }}}
    +   * it returns:
    +   * {{{
    +   *   PartitionDesc(
    --- End diff --
    
    Typo, should be `PartitionValues`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72588697
  
      [Test build #26596 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26596/consoleFull) for   PR 4308 at commit [`07599a7`](https://github.com/apache/spark/commit/07599a79fc39c463fa13057b9aaea84b0dc6c51d).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72975433
  
      [Test build #26797 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26797/consoleFull) for   PR 4308 at commit [`209f324`](https://github.com/apache/spark/commit/209f324932302c18d4d41160cf80e6026350a6f0).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72588947
  
      [Test build #26596 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26596/consoleFull) for   PR 4308 at commit [`07599a7`](https://github.com/apache/spark/commit/07599a79fc39c463fa13057b9aaea84b0dc6c51d).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class KafkaUtils(object):`
      * `public class JDBCUtils `
      * `trait Column extends DataFrame with ExpressionApi `
      * `class ColumnName(name: String) extends IncomputableColumn(name) `
      * `trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] `
      * `class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expression])`
      * `  protected[sql] class QueryExecution(val logical: LogicalPlan) `
      * `          logWarning(s"Couldn't find class $driver", e);`
      * `  implicit class JDBCDataFrame(rdd: DataFrame) `
      * `class DefaultSource extends RelationProvider with SchemaRelationProvider `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SQL] WIP: Parquet d...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r23921498
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
    @@ -227,66 +294,302 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
             val cacheMetadata = useCache
     
             @transient
    -        val cachedStatus = selectedPartitions.flatMap(_.files)
    +        val cachedStatus = selectedFiles
     
             // Overridden so we can inject our own cached files statuses.
             override def getPartitions: Array[SparkPartition] = {
    -          val inputFormat =
    -            if (cacheMetadata) {
    -              new FilteringParquetRowInputFormat {
    -                override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
    -              }
    -            } else {
    -              new FilteringParquetRowInputFormat
    +          val inputFormat = if (cacheMetadata) {
    +            new FilteringParquetRowInputFormat {
    +              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
                 }
    -
    -          inputFormat match {
    -            case configurable: Configurable =>
    -              configurable.setConf(getConf)
    -            case _ =>
    +          } else {
    +            new FilteringParquetRowInputFormat
               }
    +
               val jobContext = newJobContext(getConf, jobId)
    -          val rawSplits = inputFormat.getSplits(jobContext).toArray
    -          val result = new Array[SparkPartition](rawSplits.size)
    -          for (i <- 0 until rawSplits.size) {
    -            result(i) =
    -              new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
    +          val rawSplits = inputFormat.getSplits(jobContext)
    +
    +          Array.tabulate[SparkPartition](rawSplits.size) { i =>
    +            new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
               }
    -          result
             }
           }
     
    -    // The ordinal for the partition key in the result row, if requested.
    -    val partitionKeyLocation =
    -      partitionKeys
    -        .headOption
    -        .map(requiredColumns.indexOf(_))
    -        .getOrElse(-1)
    +    // The ordinals for partition keys in the result row, if requested.
    +    val partitionKeyLocations = partitionColumns.fieldNames.zipWithIndex.map {
    +      case (name, index) => index -> requiredColumns.indexOf(name)
    +    }.toMap.filter {
    +      case (_, index) => index >= 0
    +    }
     
         // When the data does not include the key and the key is requested then we must fill it in
         // based on information from the input split.
    -    if (!dataIncludesKey && partitionKeyLocation != -1) {
    -      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
    -        val partValue = "([^=]+)=([^=]+)".r
    -        val partValues =
    -          split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
    -            .getPath
    -            .toString
    -            .split("/")
    -            .flatMap {
    -            case partValue(key, value) => Some(key -> value)
    -            case _ => None
    -          }.toMap
    -
    -        val currentValue = partValues.values.head.toInt
    -        iter.map { pair =>
    -          val res = pair._2.asInstanceOf[SpecificMutableRow]
    -          res.setInt(partitionKeyLocation, currentValue)
    -          res
    +    if (!dataSchemaIncludesPartitionKeys && partitionKeyLocations.nonEmpty) {
    +      baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
    +        val partValues = selectedPartitions.collectFirst {
    +          case p if split.getPath.getParent.toString == p.path => p.values
    +        }.get
    +
    +        iterator.map { pair =>
    +          val row = pair._2.asInstanceOf[SpecificMutableRow]
    +          var i = 0
    +          while (i < partValues.size) {
    +            // TODO Avoids boxing cost here!
    +            row.update(partitionKeyLocations(i), partValues(i))
    +            i += 1
    +          }
    +          row
             }
           }
         } else {
           baseRDD.map(_._2)
         }
       }
    +
    +  private def prunePartitions(
    +      predicates: Seq[Expression],
    +      partitions: Seq[Partition]): Seq[Partition] = {
    +    val partitionColumnNames = partitionColumns.map(_.name).toSet
    +    val partitionPruningPredicates = predicates.filter {
    +      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
    +    }
    +
    +    val rawPredicate = partitionPruningPredicates.reduceOption(And).getOrElse(Literal(true))
    +    val boundPredicate = InterpretedPredicate(rawPredicate transform {
    +      case a: AttributeReference =>
    +        val index = partitionColumns.indexWhere(a.name == _.name)
    +        BoundReference(index, partitionColumns(index).dataType, nullable = true)
    +    })
    +
    +    if (isPartitioned && partitionPruningPredicates.nonEmpty) {
    +      partitions.filter(p => boundPredicate(p.values))
    +    } else {
    +      partitions
    +    }
    +  }
    +}
    +
    +object ParquetRelation2 {
    +  // Whether we should merge schemas collected from all Parquet part-files.
    +  val MERGE_SCHEMA = "parquet.mergeSchema"
    +
    +  // Hive Metastore schema, passed in when the Parquet relation is converted from Metastore
    +  val METASTORE_SCHEMA = "parquet.metastoreSchema"
    +
    +  // Default partition name to use when the partition column value is null or empty string
    +  val DEFAULT_PARTITION_NAME = "partition.defaultName"
    +
    +  private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
    +    footers.map { footer =>
    +      val metadata = footer.getParquetMetadata.getFileMetaData
    +      val parquetSchema = metadata.getSchema
    +      val maybeSparkSchema = metadata
    +        .getKeyValueMetaData
    +        .toMap
    +        .get(RowReadSupport.SPARK_METADATA_KEY)
    +        .map(DataType.fromJson(_).asInstanceOf[StructType])
    +
    +      maybeSparkSchema.getOrElse {
    +        // Falls back to Parquet schema if Spark SQL schema is absent.
    +        StructType.fromAttributes(
    +          // TODO Really no need to use `Attribute` here, we only need to know the data type.
    +          convertToAttributes(parquetSchema, sqlContext.conf.isParquetBinaryAsString))
    +      }
    +    }.reduce { (left, right) =>
    +      try mergeCatalystSchemas(left, right) catch { case e: Throwable =>
    +        throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
    +      }
    +    }
    +  }
    +
    +  private[parquet] def mergeMetastoreParquetSchema(
    +      metastoreSchema: StructType,
    +      parquetSchema: StructType): StructType = {
    +    def schemaConflictMessage =
    +      s"""Converting Hive Metastore Parquet, but detected conflicting schemas. Metastore schema:
    +         |${metastoreSchema.prettyJson}
    +         |
    +         |Parquet schema:
    +         |${parquetSchema.prettyJson}
    +       """.stripMargin
    +
    +    assert(metastoreSchema.size == parquetSchema.size, schemaConflictMessage)
    +
    +    val ordinalMap = metastoreSchema.zipWithIndex.map {
    +      case (field, index) => field.name.toLowerCase -> index
    +    }.toMap
    +    val reorderedParquetSchema = parquetSchema.sortBy(f => ordinalMap(f.name.toLowerCase))
    +
    +    StructType(metastoreSchema.zip(reorderedParquetSchema).map {
    +      // Uses Parquet field names but retains Metastore data types.
    +      case (mSchema, pSchema) if mSchema.name.toLowerCase == pSchema.name.toLowerCase =>
    +        mSchema.copy(name = pSchema.name)
    +      case _ =>
    +        throw new SparkException(schemaConflictMessage)
    +    })
    +  }
    +
    +  // TODO Data source implementations shouldn't touch Catalyst types (`Literal`).
    +  // However, we are already using Catalyst expressions for partition pruning and predicate
    +  // push-down here...
    +  private[parquet] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
    +    require(columnNames.size == literals.size)
    +  }
    +
    +  /**
    +   * Given a group of qualified paths, tries to parse them and returns a partition specification.
    +   * For example, given:
    +   * {{{
    +   *   hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
    +   *   hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
    +   * }}}
    +   * it returns:
    +   * {{{
    +   *   PartitionSpec(
    +   *     partitionColumns = StructType(
    +   *       StructField(name = "a", dataType = IntegerType, nullable = true),
    +   *       StructField(name = "b", dataType = StringType, nullable = true),
    +   *       StructField(name = "c", dataType = DoubleType, nullable = true)),
    +   *     partitions = Seq(
    +   *       Partition(
    +   *         values = Row(1, "hello", 3.14),
    +   *         path = "hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
    +   *       Partition(
    +   *         values = Row(2, "world", 6.28),
    +   *         path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
    +   * }}}
    +   */
    +  private[parquet] def parsePartitions(
    +      paths: Seq[Path],
    +      defaultPartitionName: String): PartitionSpec = {
    +    val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName)))
    +    val fields = {
    +      val (PartitionValues(columnNames, literals)) = partitionValues.head
    +      columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
    +        StructField(name, dataType, nullable = true)
    +      }
    +    }
    +
    +    val partitions = partitionValues.zip(paths).map {
    +      case (PartitionValues(_, literals), path) =>
    +        Partition(Row(literals.map(_.value): _*), path.toString)
    +    }
    +
    +    PartitionSpec(StructType(fields), partitions)
    +  }
    +
    +  /**
    +   * Parses a single partition, returns column names and values of each partition column.  For
    +   * example, given:
    +   * {{{
    +   *   basePath = hdfs://<host>:<port>/path/to/partition
    +   *   partitionPath = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
    +   * }}}
    +   * it returns:
    +   * {{{
    +   *   PartitionDesc(
    +   *     Seq("a", "b", "c"),
    +   *     Seq(
    +   *       Literal(42, IntegerType),
    +   *       Literal("hello", StringType),
    +   *       Literal(3.14, FloatType)))
    +   * }}}
    +   */
    +  private[parquet] def parsePartition(
    +      path: Path,
    +      defaultPartitionName: String): PartitionValues = {
    +    val columns = ArrayBuffer.empty[(String, Literal)]
    +    var finished = path.isRoot
    +    var chopped = path
    +
    +    while (!finished) {
    +      val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName)
    +      maybeColumn.foreach(columns += _)
    +      chopped = chopped.getParent
    +      finished = maybeColumn.isEmpty || chopped.isRoot
    +    }
    +
    +    val (columnNames, values) = columns.unzip
    +    PartitionValues(columnNames, values)
    +  }
    +
    +  private def parsePartitionColumn(
    +      columnSpec: String,
    +      defaultPartitionName: String): Option[(String, Literal)] = {
    +    val equalSignIndex = columnSpec.indexOf('=')
    +    if (equalSignIndex == -1) {
    +      None
    +    } else {
    +      val columnName = columnSpec.take(equalSignIndex)
    +      val literal = inferPartitionColumnValue(
    +        columnSpec.drop(equalSignIndex + 1), defaultPartitionName)
    +      Some(columnName -> literal)
    +    }
    +  }
    +
    +  /**
    +   * Resolves possible type conflicts between partitions by up-casting "lower" types.  The up-
    +   * casting order is:
    +   * {{{
    +   *   NullType ->
    +   *   IntegerType -> LongType ->
    +   *   FloatType -> DoubleType -> DecimalType.Unlimited ->
    +   *   StringType
    +   * }}}
    +   */
    +  private[parquet] def resolvePartitions(descs: Seq[PartitionValues]): Seq[PartitionValues] = {
    +    val distinctColNamesOfPartitions = descs.map(_.columnNames).distinct
    +    val columnCount = descs.head.columnNames.size
    +
    +    // Column names of all partitions must match
    +    assert(distinctColNamesOfPartitions.size == 1, {
    +      val list = distinctColNamesOfPartitions.mkString("\t", "\n", "")
    +      s"Conflicting partition column names detected:\n$list"
    +    })
    +
    +    // Resolves possible type conflicts for each column
    +    val resolvedValues = (0 until columnCount).map { i =>
    +      resolveTypeConflicts(descs.map(_.literals(i)))
    +    }
    --- End diff --
    
    Should check and throw if some column data type ends up with `NullType`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4308#discussion_r24140570
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---
    @@ -228,66 +347,397 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
             val cacheMetadata = useCache
     
             @transient
    -        val cachedStatus = selectedPartitions.flatMap(_.files)
    +        val cachedStatus = selectedFiles
     
             // Overridden so we can inject our own cached files statuses.
             override def getPartitions: Array[SparkPartition] = {
    -          val inputFormat =
    -            if (cacheMetadata) {
    -              new FilteringParquetRowInputFormat {
    -                override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
    -              }
    -            } else {
    -              new FilteringParquetRowInputFormat
    +          val inputFormat = if (cacheMetadata) {
    +            new FilteringParquetRowInputFormat {
    +              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
                 }
    -
    -          inputFormat match {
    -            case configurable: Configurable =>
    -              configurable.setConf(getConf)
    -            case _ =>
    +          } else {
    +            new FilteringParquetRowInputFormat
               }
    +
               val jobContext = newJobContext(getConf, jobId)
    -          val rawSplits = inputFormat.getSplits(jobContext).toArray
    -          val result = new Array[SparkPartition](rawSplits.size)
    -          for (i <- 0 until rawSplits.size) {
    -            result(i) =
    -              new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
    +          val rawSplits = inputFormat.getSplits(jobContext)
    +
    +          Array.tabulate[SparkPartition](rawSplits.size) { i =>
    +            new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
               }
    -          result
             }
           }
     
    -    // The ordinal for the partition key in the result row, if requested.
    -    val partitionKeyLocation =
    -      partitionKeys
    -        .headOption
    -        .map(requiredColumns.indexOf(_))
    -        .getOrElse(-1)
    +    // The ordinals for partition keys in the result row, if requested.
    +    val partitionKeyLocations = partitionColumns.fieldNames.zipWithIndex.map {
    +      case (name, index) => index -> requiredColumns.indexOf(name)
    +    }.toMap.filter {
    +      case (_, index) => index >= 0
    +    }
     
         // When the data does not include the key and the key is requested then we must fill it in
         // based on information from the input split.
    -    if (!dataIncludesKey && partitionKeyLocation != -1) {
    -      baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
    -        val partValue = "([^=]+)=([^=]+)".r
    -        val partValues =
    -          split.asInstanceOf[parquet.hadoop.ParquetInputSplit]
    -            .getPath
    -            .toString
    -            .split("/")
    -            .flatMap {
    -            case partValue(key, value) => Some(key -> value)
    -            case _ => None
    -          }.toMap
    -
    -        val currentValue = partValues.values.head.toInt
    -        iter.map { pair =>
    -          val res = pair._2.asInstanceOf[SpecificMutableRow]
    -          res.setInt(partitionKeyLocation, currentValue)
    -          res
    +    if (!dataSchemaIncludesPartitionKeys && partitionKeyLocations.nonEmpty) {
    +      baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
    +        val partValues = selectedPartitions.collectFirst {
    +          case p if split.getPath.getParent.toString == p.path => p.values
    +        }.get
    +
    +        iterator.map { pair =>
    +          val row = pair._2.asInstanceOf[SpecificMutableRow]
    +          var i = 0
    +          while (i < partValues.size) {
    +            // TODO Avoids boxing cost here!
    +            row.update(partitionKeyLocations(i), partValues(i))
    +            i += 1
    +          }
    +          row
             }
           }
         } else {
           baseRDD.map(_._2)
         }
       }
    +
    +  private def prunePartitions(
    +      predicates: Seq[Expression],
    +      partitions: Seq[Partition]): Seq[Partition] = {
    +    val partitionColumnNames = partitionColumns.map(_.name).toSet
    +    val partitionPruningPredicates = predicates.filter {
    +      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
    +    }
    +
    +    val rawPredicate = partitionPruningPredicates.reduceOption(And).getOrElse(Literal(true))
    +    val boundPredicate = InterpretedPredicate(rawPredicate transform {
    +      case a: AttributeReference =>
    +        val index = partitionColumns.indexWhere(a.name == _.name)
    +        BoundReference(index, partitionColumns(index).dataType, nullable = true)
    +    })
    +
    +    if (isPartitioned && partitionPruningPredicates.nonEmpty) {
    +      partitions.filter(p => boundPredicate(p.values))
    +    } else {
    +      partitions
    +    }
    +  }
    +
    +  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
    +    // TODO: currently we do not check whether the "schema"s are compatible
    +    // That means if one first creates a table and then INSERTs data with
    +    // and incompatible schema the execution will fail. It would be nice
    +    // to catch this early one, maybe having the planner validate the schema
    +    // before calling execute().
    +
    +    val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
    +    val writeSupport = if (schema.map(_.dataType).forall(_.isPrimitive)) {
    +      log.debug("Initializing MutableRowWriteSupport")
    +      classOf[MutableRowWriteSupport]
    +    } else {
    +      classOf[RowWriteSupport]
    +    }
    +
    +    ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
    +
    +    val conf = ContextUtil.getConfiguration(job)
    +    RowWriteSupport.setSchema(schema.toAttributes, conf)
    +
    +    val destinationPath = new Path(paths.head)
    +
    +    if (overwrite) {
    +      try {
    +        destinationPath.getFileSystem(conf).delete(destinationPath, true)
    +      } catch {
    +        case e: IOException =>
    +          throw new IOException(
    +            s"Unable to clear output directory ${destinationPath.toString} prior" +
    +              s" to writing to Parquet file:\n${e.toString}")
    +      }
    +    }
    +
    +    job.setOutputKeyClass(classOf[Void])
    +    job.setOutputValueClass(classOf[Row])
    +    FileOutputFormat.setOutputPath(job, destinationPath)
    +
    +    val wrappedConf = new SerializableWritable(job.getConfiguration)
    +    val jobTrackerId = new SimpleDateFormat("yyyyMMddHHmm").format(new Date())
    +    val stageId = sqlContext.sparkContext.newRddId()
    +
    +    val taskIdOffset = if (overwrite) {
    +      1
    +    } else {
    +      FileSystemHelper.findMaxTaskId(
    +        FileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
    +    }
    +
    +    def writeShard(context: TaskContext, iterator: Iterator[Row]): Unit = {
    +      /* "reduce task" <split #> <attempt # = spark task #> */
    +      val attemptId = newTaskAttemptID(
    +        jobTrackerId, stageId, isMap = false, context.partitionId(), context.attemptNumber())
    +      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
    +      val format = new AppendingParquetOutputFormat(taskIdOffset)
    +      val committer = format.getOutputCommitter(hadoopContext)
    +      committer.setupTask(hadoopContext)
    +      val writer = format.getRecordWriter(hadoopContext)
    +      try {
    +        while (iterator.hasNext) {
    +          val row = iterator.next()
    +          writer.write(null, row)
    +        }
    +      } finally {
    +        writer.close(hadoopContext)
    +      }
    +      committer.commitTask(hadoopContext)
    +    }
    +    val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
    +    /* apparently we need a TaskAttemptID to construct an OutputCommitter;
    +     * however we're only going to use this local OutputCommitter for
    +     * setupJob/commitJob, so we just use a dummy "map" task.
    +     */
    +    val jobAttemptId = newTaskAttemptID(jobTrackerId, stageId, isMap = true, 0, 0)
    +    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
    +    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
    +
    +    jobCommitter.setupJob(jobTaskContext)
    +    sqlContext.sparkContext.runJob(data.queryExecution.executedPlan.execute(), writeShard _)
    +    jobCommitter.commitJob(jobTaskContext)
    +
    +    metadataCache.refresh()
    +  }
    +}
    +
    +object ParquetRelation2 {
    +  // Whether we should merge schemas collected from all Parquet part-files.
    +  val MERGE_SCHEMA = "parquet.mergeSchema"
    --- End diff --
    
    why prefix these with `parquet`? that seems redudant since you can only use them after specifying `USING org.apache.spark.sql.parquet`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72589451
  
      [Test build #26601 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26601/consoleFull) for   PR 4308 at commit [`bcb3ad6`](https://github.com/apache/spark/commit/bcb3ad688d57f84725665a1204eb9e1dff340033).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-73138264
  
      [Test build #26856 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26856/consoleFull) for   PR 4308 at commit [`1ad361e`](https://github.com/apache/spark/commit/1ad361e3ef57bd696859ef18d46fa419b3761986).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]`
      * `trait CreatableRelationProvider `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72588948
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26596/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SQL] WIP: Parquet d...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72460477
  
      [Test build #26514 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26514/consoleFull) for   PR 4308 at commit [`af3683e`](https://github.com/apache/spark/commit/af3683ea68d3efe7c0368cb8d23fdd661fbfeffc).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class DefaultSource extends RelationProvider with SchemaRelationProvider `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72586456
  
      [Test build #26591 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26591/consoleFull) for   PR 4308 at commit [`a760555`](https://github.com/apache/spark/commit/a76055576ba67dc69441a1c8086a8dfe52add9c3).
     * This patch **does not merge cleanly**.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5182] [SPARK-5528] [SPARK-5509] [SPARK-...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/4308#issuecomment-72984970
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26797/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org