You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/20 19:56:14 UTC

spark git commit: Revert "[SPARK-24811][SQL] Avro: add new function from_avro and to_avro"

Repository: spark
Updated Branches:
  refs/heads/master 3cb1b5780 -> 9ad77b303


Revert "[SPARK-24811][SQL] Avro: add new function from_avro and to_avro"

This reverts commit 244bcff19463d82ec72baf15bc0a5209f21f2ef3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ad77b30
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ad77b30
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ad77b30

Branch: refs/heads/master
Commit: 9ad77b3037b476b726b773c38d1cd264d89d51e2
Parents: 3cb1b57
Author: Xiao Li <ga...@gmail.com>
Authored: Fri Jul 20 12:55:38 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Fri Jul 20 12:55:38 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/avro/AvroDataToCatalyst.scala     |  68 -------
 .../spark/sql/avro/CatalystDataToAvro.scala     |  69 --------
 .../org/apache/spark/sql/avro/package.scala     |  31 ----
 .../avro/AvroCatalystDataConversionSuite.scala  | 175 -------------------
 .../spark/sql/avro/AvroFunctionsSuite.scala     |  83 ---------
 .../expressions/ExpressionEvalHelper.scala      |   6 -
 6 files changed, 432 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9ad77b30/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
deleted file mode 100644
index 6671b3f..0000000
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericDatumReader
-import org.apache.avro.io.{BinaryDecoder, DecoderFactory}
-
-import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
-import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
-import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType}
-
-case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String)
-  extends UnaryExpression with ExpectsInputTypes {
-
-  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
-
-  override lazy val dataType: DataType = SchemaConverters.toSqlType(avroSchema).dataType
-
-  override def nullable: Boolean = true
-
-  @transient private lazy val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
-
-  @transient private lazy val reader = new GenericDatumReader[Any](avroSchema)
-
-  @transient private lazy val deserializer = new AvroDeserializer(avroSchema, dataType)
-
-  @transient private var decoder: BinaryDecoder = _
-
-  @transient private var result: Any = _
-
-  override def nullSafeEval(input: Any): Any = {
-    val binary = input.asInstanceOf[Array[Byte]]
-    decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder)
-    result = reader.read(result, decoder)
-    deserializer.deserialize(result)
-  }
-
-  override def simpleString: String = {
-    s"from_avro(${child.sql}, ${dataType.simpleString})"
-  }
-
-  override def sql: String = {
-    s"from_avro(${child.sql}, ${dataType.catalogString})"
-  }
-
-  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-    val expr = ctx.addReferenceObj("this", this)
-    defineCodeGen(ctx, ev, input =>
-      s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9ad77b30/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala
deleted file mode 100644
index a669388..0000000
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.io.ByteArrayOutputStream
-
-import org.apache.avro.generic.GenericDatumWriter
-import org.apache.avro.io.{BinaryEncoder, EncoderFactory}
-
-import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
-import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
-import org.apache.spark.sql.types.{BinaryType, DataType}
-
-case class CatalystDataToAvro(child: Expression) extends UnaryExpression {
-
-  override def dataType: DataType = BinaryType
-
-  @transient private lazy val avroType =
-    SchemaConverters.toAvroType(child.dataType, child.nullable)
-
-  @transient private lazy val serializer =
-    new AvroSerializer(child.dataType, avroType, child.nullable)
-
-  @transient private lazy val writer =
-    new GenericDatumWriter[Any](avroType)
-
-  @transient private var encoder: BinaryEncoder = _
-
-  @transient private lazy val out = new ByteArrayOutputStream
-
-  override def nullSafeEval(input: Any): Any = {
-    out.reset()
-    encoder = EncoderFactory.get().directBinaryEncoder(out, encoder)
-    val avroData = serializer.serialize(input)
-    writer.write(avroData, encoder)
-    encoder.flush()
-    out.toByteArray
-  }
-
-  override def simpleString: String = {
-    s"to_avro(${child.sql}, ${child.dataType.simpleString})"
-  }
-
-  override def sql: String = {
-    s"to_avro(${child.sql}, ${child.dataType.catalogString})"
-  }
-
-  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-    val expr = ctx.addReferenceObj("this", this)
-    defineCodeGen(ctx, ev, input =>
-      s"(byte[]) $expr.nullSafeEval($input)")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9ad77b30/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
index e82651d..b3c8a66 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
@@ -17,10 +17,6 @@
 
 package org.apache.spark.sql
 
-import org.apache.avro.Schema
-
-import org.apache.spark.annotation.Experimental
-
 package object avro {
   /**
    * Adds a method, `avro`, to DataFrameWriter that allows you to write avro files using
@@ -40,31 +36,4 @@ package object avro {
     @scala.annotation.varargs
     def avro(sources: String*): DataFrame = reader.format("avro").load(sources: _*)
   }
-
-  /**
-   * Converts a binary column of avro format into its corresponding catalyst value. The specified
-   * schema must match the read data, otherwise the behavior is undefined: it may fail or return
-   * arbitrary result.
-   *
-   * @param data the binary column.
-   * @param jsonFormatSchema the avro schema in JSON string format.
-   *
-   * @since 2.4.0
-   */
-  @Experimental
-  def from_avro(data: Column, jsonFormatSchema: String): Column = {
-    new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema))
-  }
-
-  /**
-   * Converts a column into binary of avro format.
-   *
-   * @param data the data column.
-   *
-   * @since 2.4.0
-   */
-  @Experimental
-  def to_avro(data: Column): Column = {
-    new Column(CatalystDataToAvro(data.expr))
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9ad77b30/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
deleted file mode 100644
index 06d5477..0000000
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.avro
-
-import org.apache.avro.Schema
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{AvroDataToCatalyst, CatalystDataToAvro, RandomDataGenerator}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, GenericInternalRow, Literal}
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData}
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalHelper {
-
-  private def roundTripTest(data: Literal): Unit = {
-    val avroType = SchemaConverters.toAvroType(data.dataType, data.nullable)
-    checkResult(data, avroType.toString, data.eval())
-  }
-
-  private def checkResult(data: Literal, schema: String, expected: Any): Unit = {
-    checkEvaluation(
-      AvroDataToCatalyst(CatalystDataToAvro(data), schema),
-      prepareExpectedResult(expected))
-  }
-
-  private def assertFail(data: Literal, schema: String): Unit = {
-    intercept[java.io.EOFException] {
-      AvroDataToCatalyst(CatalystDataToAvro(data), schema).eval()
-    }
-  }
-
-  private val testingTypes = Seq(
-    BooleanType,
-    ByteType,
-    ShortType,
-    IntegerType,
-    LongType,
-    FloatType,
-    DoubleType,
-    DecimalType(8, 0),   // 32 bits decimal without fraction
-    DecimalType(8, 4),   // 32 bits decimal
-    DecimalType(16, 0),  // 64 bits decimal without fraction
-    DecimalType(16, 11), // 64 bits decimal
-    DecimalType(38, 0),
-    DecimalType(38, 38),
-    StringType,
-    BinaryType)
-
-  protected def prepareExpectedResult(expected: Any): Any = expected match {
-    // Spark decimal is converted to avro string=
-    case d: Decimal => UTF8String.fromString(d.toString)
-    // Spark byte and short both map to avro int
-    case b: Byte => b.toInt
-    case s: Short => s.toInt
-    case row: GenericInternalRow => InternalRow.fromSeq(row.values.map(prepareExpectedResult))
-    case array: GenericArrayData => new GenericArrayData(array.array.map(prepareExpectedResult))
-    case map: MapData =>
-      val keys = new GenericArrayData(
-        map.keyArray().asInstanceOf[GenericArrayData].array.map(prepareExpectedResult))
-      val values = new GenericArrayData(
-        map.valueArray().asInstanceOf[GenericArrayData].array.map(prepareExpectedResult))
-      new ArrayBasedMapData(keys, values)
-    case other => other
-  }
-
-  testingTypes.foreach { dt =>
-    val seed = scala.util.Random.nextLong()
-    test(s"single $dt with seed $seed") {
-      val rand = new scala.util.Random(seed)
-      val data = RandomDataGenerator.forType(dt, rand = rand).get.apply()
-      val converter = CatalystTypeConverters.createToCatalystConverter(dt)
-      val input = Literal.create(converter(data), dt)
-      roundTripTest(input)
-    }
-  }
-
-  for (_ <- 1 to 5) {
-    val seed = scala.util.Random.nextLong()
-    val rand = new scala.util.Random(seed)
-    val schema = RandomDataGenerator.randomSchema(rand, 5, testingTypes)
-    test(s"flat schema ${schema.catalogString} with seed $seed") {
-      val data = RandomDataGenerator.randomRow(rand, schema)
-      val converter = CatalystTypeConverters.createToCatalystConverter(schema)
-      val input = Literal.create(converter(data), schema)
-      roundTripTest(input)
-    }
-  }
-
-  for (_ <- 1 to 5) {
-    val seed = scala.util.Random.nextLong()
-    val rand = new scala.util.Random(seed)
-    val schema = RandomDataGenerator.randomNestedSchema(rand, 10, testingTypes)
-    test(s"nested schema ${schema.catalogString} with seed $seed") {
-      val data = RandomDataGenerator.randomRow(rand, schema)
-      val converter = CatalystTypeConverters.createToCatalystConverter(schema)
-      val input = Literal.create(converter(data), schema)
-      roundTripTest(input)
-    }
-  }
-
-  test("read int as string") {
-    val data = Literal(1)
-    val avroTypeJson =
-      s"""
-         |{
-         |  "type": "string",
-         |  "name": "my_string"
-         |}
-       """.stripMargin
-
-    // When read int as string, avro reader is not able to parse the binary and fail.
-    assertFail(data, avroTypeJson)
-  }
-
-  test("read string as int") {
-    val data = Literal("abc")
-    val avroTypeJson =
-      s"""
-         |{
-         |  "type": "int",
-         |  "name": "my_int"
-         |}
-       """.stripMargin
-
-    // When read string data as int, avro reader is not able to find the type mismatch and read
-    // the string length as int value.
-    checkResult(data, avroTypeJson, 3)
-  }
-
-  test("read float as double") {
-    val data = Literal(1.23f)
-    val avroTypeJson =
-      s"""
-         |{
-         |  "type": "double",
-         |  "name": "my_double"
-         |}
-       """.stripMargin
-
-    // When read float data as double, avro reader fails(trying to read 8 bytes while the data have
-    // only 4 bytes).
-    assertFail(data, avroTypeJson)
-  }
-
-  test("read double as float") {
-    val data = Literal(1.23)
-    val avroTypeJson =
-      s"""
-         |{
-         |  "type": "float",
-         |  "name": "my_float"
-         |}
-       """.stripMargin
-
-    // avro reader reads the first 4 bytes of a double as a float, the result is totally undefined.
-    checkResult(data, avroTypeJson, 5.848603E35f)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9ad77b30/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
deleted file mode 100644
index 90a4cd6..0000000
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.avro
-
-import org.apache.avro.Schema
-
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.functions.struct
-import org.apache.spark.sql.test.SharedSQLContext
-
-class AvroFunctionsSuite extends QueryTest with SharedSQLContext {
-  import testImplicits._
-
-  test("roundtrip in to_avro and from_avro - int and string") {
-    val df = spark.range(10).select('id, 'id.cast("string").as("str"))
-
-    val avroDF = df.select(to_avro('id).as("a"), to_avro('str).as("b"))
-    val avroTypeLong = s"""
-      |{
-      |  "type": "int",
-      |  "name": "id"
-      |}
-    """.stripMargin
-    val avroTypeStr = s"""
-      |{
-      |  "type": "string",
-      |  "name": "str"
-      |}
-    """.stripMargin
-    checkAnswer(avroDF.select(from_avro('a, avroTypeLong), from_avro('b, avroTypeStr)), df)
-  }
-
-  test("roundtrip in to_avro and from_avro - struct") {
-    val df = spark.range(10).select(struct('id, 'id.cast("string").as("str")).as("struct"))
-    val avroStructDF = df.select(to_avro('struct).as("avro"))
-    val avroTypeStruct = s"""
-      |{
-      |  "type": "record",
-      |  "name": "struct",
-      |  "fields": [
-      |    {"name": "col1", "type": "long"},
-      |    {"name": "col2", "type": "string"}
-      |  ]
-      |}
-    """.stripMargin
-    checkAnswer(avroStructDF.select(from_avro('avro, avroTypeStruct)), df)
-  }
-
-  test("roundtrip in to_avro and from_avro - array with null") {
-    val dfOne = Seq(Tuple1(Tuple1(1) :: Nil), Tuple1(null :: Nil)).toDF("array")
-    val avroTypeArrStruct = s"""
-      |[ {
-      |  "type" : "array",
-      |  "items" : [ {
-      |    "type" : "record",
-      |    "name" : "x",
-      |    "fields" : [ {
-      |      "name" : "y",
-      |      "type" : "int"
-      |    } ]
-      |  }, "null" ]
-      |}, "null" ]
-    """.stripMargin
-    val readBackOne = dfOne.select(to_avro($"array").as("avro"))
-      .select(from_avro($"avro", avroTypeArrStruct).as("array"))
-    checkAnswer(dfOne, readBackOne)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9ad77b30/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index d045267..14bfa21 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -79,12 +79,6 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
         java.util.Arrays.equals(result, expected)
       case (result: Double, expected: Spread[Double @unchecked]) =>
         expected.asInstanceOf[Spread[Double]].isWithin(result)
-      case (result: InternalRow, expected: InternalRow) =>
-        val st = dataType.asInstanceOf[StructType]
-        assert(result.numFields == st.length && expected.numFields == st.length)
-        st.zipWithIndex.forall { case (f, i) =>
-          checkResult(result.get(i, f.dataType), expected.get(i, f.dataType), f.dataType)
-        }
       case (result: ArrayData, expected: ArrayData) =>
         result.numElements == expected.numElements && {
           val et = dataType.asInstanceOf[ArrayType].elementType


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org