You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/10/06 17:21:46 UTC

[1/2] flink git commit: Revert "[FLINK-2203] handling null values for RowSerializer"

Repository: flink
Updated Branches:
  refs/heads/master b08669abf -> 48791c347


Revert "[FLINK-2203] handling null values for RowSerializer"

This reverts commit f8e12b20d925c3f6f24769327d1da5d98affa679.

The commit had to be reverted because the RowSerializer is not in sync
with other comperators and serializers. See FLINK-2236.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48791c34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48791c34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48791c34

Branch: refs/heads/master
Commit: 48791c34776fe10373ef3abbc35d9b0fcfbda1e4
Parents: ff0a1a0
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Oct 6 11:25:56 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Oct 6 17:16:55 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-table/pom.xml               |  8 ---
 .../api/table/typeinfo/RowSerializer.scala      | 67 ++++++-------------
 .../api/table/typeinfo/RowSerializerTest.scala  | 70 --------------------
 3 files changed, 19 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48791c34/flink-staging/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/pom.xml b/flink-staging/flink-table/pom.xml
index 1a622aa..358e116 100644
--- a/flink-staging/flink-table/pom.xml
+++ b/flink-staging/flink-table/pom.xml
@@ -94,14 +94,6 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/48791c34/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
index 02219c7..5e9613d 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
@@ -17,13 +17,9 @@
  */
 package org.apache.flink.api.table.typeinfo
 
-import java.util
-
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.common.typeutils.base.BooleanSerializer
 import org.apache.flink.api.table.Row
-import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.core.memory.{DataOutputView, DataInputView}
 
 /**
  * Serializer for [[Row]].
@@ -31,8 +27,6 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
   extends TypeSerializer[Row] {
 
-  private def getFieldSerializers = fieldSerializers
-
   override def isImmutableType: Boolean = false
 
   override def getLength: Int = -1
@@ -79,17 +73,11 @@ class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
 
   override def serialize(value: Row, target: DataOutputView) {
     val len = fieldSerializers.length
-    var index = 0
-    while (index < len) {
-      val o: AnyRef = value.productElement(index).asInstanceOf[AnyRef]
-      if (o == null) {
-        target.writeBoolean(true)
-      } else {
-        target.writeBoolean(false)
-        val serializer = fieldSerializers(index)
-        serializer.serialize(value.productElement(index), target)
-      }
-      index += 1
+    var i = 0
+    while (i < len) {
+      val serializer = fieldSerializers(i)
+      serializer.serialize(value.productElement(i), target)
+      i += 1
     }
   }
 
@@ -100,17 +88,11 @@ class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
       throw new RuntimeException("Row arity of reuse and fields do not match.")
     }
 
-    var index = 0
-    while (index < len) {
-      val isNull: Boolean = source.readBoolean
-      if (isNull) {
-        reuse.setField(index, null)
-      } else {
-        val field = reuse.productElement(index).asInstanceOf[AnyRef]
-        val serializer: TypeSerializer[Any] = fieldSerializers(index)
-        reuse.setField(index, serializer.deserialize(field, source))
-      }
-      index += 1
+    var i = 0
+    while (i < len) {
+      val field = reuse.productElement(i).asInstanceOf[AnyRef]
+      reuse.setField(i, fieldSerializers(i).deserialize(field, source))
+      i += 1
     }
     reuse
   }
@@ -119,17 +101,10 @@ class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
     val len = fieldSerializers.length
 
     val result = new Row(len)
-
-    var index = 0
-    while (index < len) {
-      val isNull: Boolean = source.readBoolean()
-      if (isNull) {
-        result.setField(index, null)
-      } else {
-        val serializer: TypeSerializer[Any] = fieldSerializers(index)
-        result.setField(index, serializer.deserialize(source))
-      }
-      index += 1
+    var i = 0
+    while (i < len) {
+      result.setField(i, fieldSerializers(i).deserialize(source))
+      i += 1
     }
     result
   }
@@ -138,11 +113,7 @@ class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
     val len = fieldSerializers.length
     var i = 0
     while (i < len) {
-      val isNull = source.readBoolean()
-      target.writeBoolean(isNull)
-      if (!isNull) {
-        fieldSerializers(i).copy(source, target)
-      }
+      fieldSerializers(i).copy(source, target)
       i += 1
     }
   }
@@ -151,7 +122,7 @@ class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
     any match {
       case otherRS: RowSerializer =>
         otherRS.canEqual(this) &&
-        fieldSerializers.sameElements(otherRS.fieldSerializers)
+          fieldSerializers.sameElements(otherRS.fieldSerializers)
       case _ => false
     }
   }
@@ -161,6 +132,6 @@ class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
   }
 
   override def hashCode(): Int = {
-    util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]])
+    java.util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48791c34/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
deleted file mode 100644
index cff276a..0000000
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeinfo
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.{SerializerTestInstance, TypeSerializer}
-import org.apache.flink.api.table.Row
-import org.junit.Assert._
-import org.junit.Test
-
-class RowSerializerTest {
-
-  class RowSerializerTestInstance(serializer: TypeSerializer[Row],
-                                  testData: Array[Row])
-    extends SerializerTestInstance(serializer, classOf[Row], -1, testData: _*) {
-
-    override protected def deepEquals(message: String, should: Row, is: Row): Unit = {
-      val arity = should.productArity
-      assertEquals(message, arity, is.productArity)
-      var index = 0
-      while (index < arity) {
-        val copiedValue: Any = should.productElement(index)
-        val element: Any = is.productElement(index)
-        assertEquals(message, element, copiedValue)
-        index += 1
-      }
-    }
-  }
-
-  @Test
-  def testRowSerializer(): Unit ={
-
-    val rowInfo: TypeInformation[Row] = new RowTypeInfo(
-      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name"))
-
-    val row1 = new Row(2)
-    row1.setField(0, 1)
-    row1.setField(1, "a")
-
-    val row2 = new Row(2)
-    row2.setField(0, 2)
-    row2.setField(1, null)
-
-    val testData: Array[Row] = Array(row1, row2)
-
-    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig)
-
-    val testInstance = new RowSerializerTestInstance(rowSerializer,testData)
-
-    testInstance.testAll()
-  }
-
-}


[2/2] flink git commit: Revert "[FLINK-2210] Table API support for aggregation on columns with null values"

Posted by mx...@apache.org.
Revert "[FLINK-2210] Table API support for aggregation on columns with null values"

This reverts commit b59c81bc41f0fc4ade5359dfdf42549a76d412fa.

The commit had to be reverted because the RowSerializer is not in sync
with other comparators and serializers. See FLINK-2236.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff0a1a0b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff0a1a0b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff0a1a0b

Branch: refs/heads/master
Commit: ff0a1a0b57ea239ef05d240ca9d9d5bd20d458ec
Parents: b08669a
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Oct 6 12:34:19 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Oct 6 17:16:55 2015 +0200

----------------------------------------------------------------------
 .../table/codegen/ExpressionCodeGenerator.scala | 19 -------
 .../api/table/expressions/aggregations.scala    |  2 +-
 .../api/table/expressions/comparison.scala      |  8 ---
 .../runtime/ExpressionAggregateFunction.scala   |  5 +-
 .../scala/table/test/AggregationsITCase.scala   | 58 +-------------------
 5 files changed, 4 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff0a1a0b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
index 43396d9..c9220e9 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
@@ -514,25 +514,6 @@ abstract class ExpressionCodeGenerator[R](
             """.stripMargin
         }
 
-      case NumericIsNotNull(child) =>
-        val childCode = generateExpression(child)
-        if (nullCheck) {
-          childCode.code +
-            s"""
-               |boolean $nullTerm = ${childCode.nullTerm};
-               |if ($nullTerm) {
-               |  0;
-               |} else {
-               |  $resultTpe $resultTerm = ${childCode.resultTerm} != null ? 1 : 0;
-               |}
-            """.stripMargin
-        } else {
-          childCode.code +
-            s"""
-               |$resultTpe $resultTerm = ${childCode.resultTerm} != null ? 1 : 0;
-            """.stripMargin
-        }
-
       case _ => throw new ExpressionException("Could not generate code for expression " + expr)
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff0a1a0b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
index a762f66..08e319d 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
@@ -89,7 +89,7 @@ case class Count(child: Expression) extends Aggregation {
 case class Avg(child: Expression) extends Aggregation {
   override def toString = s"($child).avg"
 
-  override def getIntermediateFields: Seq[Expression] = Seq(child, NumericIsNotNull(child))
+  override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1))
   // This is just sweet. Use our own AST representation and let the code generator do
   // our dirty work.
   override def getFinalField(inputs: Seq[Expression]): Expression =

http://git-wip-us.apache.org/repos/asf/flink/blob/ff0a1a0b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
index c60acf9..687ea7a 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
@@ -91,11 +91,3 @@ case class IsNotNull(child: Expression) extends UnaryExpression {
 
   override def toString = s"($child).isNotNull"
 }
-
-case class NumericIsNotNull(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    BasicTypeInfo.INT_TYPE_INFO
-  }
-
-  override def toString = s"($child).numericIsNotNull"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff0a1a0b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
index 7d7dc1c..7e9bc0d 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
@@ -53,10 +53,7 @@ class ExpressionAggregateFunction(
       var i = 0
       val len = functions.length
       while (i < len) {
-        val element: Any = current.productElement(fieldPositions(i))
-        if (element != null){
-          functions(i).aggregate(element)
-        }
+        functions(i).aggregate(current.productElement(fieldPositions(i)))
         i += 1
       }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff0a1a0b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index acbeab7..7ac8eef 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -18,16 +18,13 @@
 
 package org.apache.flink.api.scala.table.test
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.table.{Row, ExpressionException}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
-import org.apache.flink.api.table.{ExpressionException, Row}
 import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.Assert._
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
@@ -127,56 +124,5 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
     expected = ""
   }
 
-  @Test
-  def testAggregationWithNullValues(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val dataSet = env.fromElements[(Integer, String)](
-      (123, "a"), (234, "b"), (345, "c"), (0, "d"))
-
-    implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo(
-      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name"))
-
-    val rowDataSet = dataSet.map {
-      entry =>
-        val row = new Row(2)
-        val amount = if (entry._1 > 200) entry._1 else null
-        row.setField(0, amount)
-        row.setField(1, entry._2)
-        row
-    }
-
-    val entries = rowDataSet.toTable.select('id.avg, 'id.sum, 'id.count).collect().head
-    val mean = entries.productElement(0).toString.toInt
-    val sum = entries.productElement(1).toString.toInt
-    val count = entries.productElement(2).toString.toInt
-
-    assertEquals(4,count)
-
-    val computedMean = sum / 2
-    assertEquals(computedMean, mean)
-  }
-
-  @Test
-  def testAggregationWhenAllValuesAreNull(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val dataSet = env.fromElements[(Integer, String)](
-      (123, "a"), (234, "b"), (345, "c"), (0, "d"))
-
-    implicit val rowInfo: TypeInformation[Row] = new RowTypeInfo(
-      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name"))
-
-    val rowDataSet = dataSet.map {
-      entry =>
-        val row = new Row(2)
-        row.setField(0, null)
-        row.setField(1, entry._2)
-        row
-    }
-
-    val entries = rowDataSet.toTable.select('id.max).collect().head.productElement(0)
-    assertEquals(entries, null)
-  }
 
 }