You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by DylanGuedes <gi...@git.apache.org> on 2018/06/04 12:53:14 UTC
[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql
Github user DylanGuedes commented on a diff in the pull request:
https://github.com/apache/spark/pull/21045#discussion_r192729662
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---
@@ -127,6 +127,165 @@ case class MapKeys(child: Expression)
override def prettyName: String = "map_keys"
}
+@ExpressionDescription(
+ usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in the N-th position the
+ N-th value of each array given.""",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4));
+ [[1, 2], [2, 3], [3, 4]]
+ > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4));
+ [[1, 2, 3], [2, 3, 4]]
+ """,
+ since = "2.4.0")
+case class Zip(children: Seq[Expression]) extends Expression with ExpectsInputTypes {
+
+ override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.length)(ArrayType)
+
+ override def dataType: DataType = ArrayType(mountSchema)
+
+ override def nullable: Boolean = children.forall(_.nullable)
+
+ private lazy val arrayTypes = children.map(_.dataType.asInstanceOf[ArrayType])
+
+ private lazy val arrayElementTypes = arrayTypes.map(_.elementType)
+
+ def mountSchema: StructType = {
+ val fields = children.zip(arrayElementTypes).zipWithIndex.map {
+ case ((expr: NamedExpression, elementType), _) =>
+ StructField(expr.name, elementType, nullable = true)
+ case ((_, elementType), idx) =>
+ StructField(s"$idx", elementType, nullable = true)
+ }
+ StructType(fields)
+ }
+
+ override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ val numberOfArrays: Int = children.length
+ val genericArrayData = classOf[GenericArrayData].getName
+ val genericInternalRow = classOf[GenericInternalRow].getName
+ val arrVals = ctx.freshName("arrVals")
+ val arrCardinality = ctx.freshName("arrCardinality")
+ val biggestCardinality = ctx.freshName("biggestCardinality")
+ val storedArrTypes = ctx.freshName("storedArrTypes")
+ val returnNull = ctx.freshName("returnNull")
+ val evals = children.map(_.genCode(ctx))
+
+ val inputs = evals.zipWithIndex.map { case (eval, index) =>
+ s"""
+ |${eval.code}
+ |if (!${eval.isNull}) {
+ | $arrVals[$index] = ${eval.value};
+ | $arrCardinality[$index] = ${eval.value}.numElements();
+ |} else {
+ | $arrVals[$index] = null;
+ | $arrCardinality[$index] = 0;
+ | $returnNull[0] = true;
+ |}
+ |$storedArrTypes[$index] = "${arrayElementTypes(index)}";
+ |$biggestCardinality = Math.max($biggestCardinality, $arrCardinality[$index]);
+ """.stripMargin
+ }
+
+ val inputsSplitted = ctx.splitExpressions(
+ expressions = inputs,
+ funcName = "getInputAndCardinality",
+ returnType = "int",
+ makeSplitFunction = body =>
+ s"""
+ |$body
+ |return $biggestCardinality;
+ """.stripMargin,
+ foldFunctions = _.map(funcCall => s"$biggestCardinality = $funcCall;").mkString("\n"),
+ arguments =
+ ("ArrayData[]", arrVals) ::
+ ("int[]", arrCardinality) ::
+ ("String[]", storedArrTypes) ::
+ ("int", biggestCardinality) ::
+ ("boolean[]", returnNull) :: Nil)
+
+ val myobject = ctx.freshName("myobject")
+ val j = ctx.freshName("j")
+ val i = ctx.freshName("i")
+ val args = ctx.freshName("args")
+
+ val cases = arrayElementTypes.distinct.map { elementType =>
+ val getArrValsItem = CodeGenerator.getValue(s"$arrVals[$j]", elementType, i)
+ s"""
+ |case "${elementType}":
+ | $myobject[$j] = $getArrValsItem;
+ | break;
+ """.stripMargin
+ }
+
+ ev.copy(s"""
+ |ArrayData[] $arrVals = new ArrayData[$numberOfArrays];
+ |int[] $arrCardinality = new int[$numberOfArrays];
+ |int $biggestCardinality = 0;
+ |String[] $storedArrTypes = new String[$numberOfArrays];
+ |boolean[] $returnNull = new boolean[1];
+ |$returnNull[0] = false;
+ |$inputsSplitted
+ |${CodeGenerator.javaType(dataType)} ${ev.value};
+ |boolean ${ev.isNull} = $returnNull[0];
+ |if (${ev.isNull}) {
+ | ${ev.value} = null;
+ |} else {
+ | if ($numberOfArrays == 0) {
+ | ${ev.value} = new $genericArrayData(new Object[0]);
+ | } else {
+ | Object[] $args = new Object[$biggestCardinality];
+ | for (int $i = 0; $i < $biggestCardinality; $i ++) {
+ | Object[] $myobject = new Object[$numberOfArrays];
+ | for (int $j = 0; $j < $numberOfArrays; $j ++) {
+ | if ($arrVals[$j] != null && $arrCardinality[$j] > $i && !$arrVals[$j].isNullAt($i)) {
+ | switch ($storedArrTypes[$j]) {
+ | ${cases.mkString("\n")}
+ | default:
+ | break;
+ | }
+ | } else {
+ | $myobject[$j] = null;
+ | }
+ | }
+ | $args[$i] = new $genericInternalRow($myobject);
--- End diff --
That's a great idea, thank you!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org