You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/06/18 11:13:52 UTC

[2/2] flink git commit: Revert "[FLINK-2203] handling null values for RowSerializer"

Revert "[FLINK-2203] handling null values for RowSerializer"

This reverts commit adc3e0e6b1d2013c522e55ee027927488bde09f2.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbd08f3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbd08f3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbd08f3c

Branch: refs/heads/release-0.9
Commit: bbd08f3ceeb277f8e3d27f9ee8572ec712c029c6
Parents: d6ad294
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Jun 18 10:20:18 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Jun 18 10:20:18 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-table/pom.xml               |  8 ---
 .../api/table/typeinfo/RowSerializer.scala      | 73 +++++---------------
 .../api/table/typeinfo/RowSerializerTest.scala  | 70 -------------------
 3 files changed, 18 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bbd08f3c/flink-staging/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/pom.xml b/flink-staging/flink-table/pom.xml
index 20600ff..cbd1c47 100644
--- a/flink-staging/flink-table/pom.xml
+++ b/flink-staging/flink-table/pom.xml
@@ -94,14 +94,6 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd08f3c/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
index 527e2b4..8a8dc3d 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
@@ -17,13 +17,10 @@
  */
 package org.apache.flink.api.table.typeinfo
 
-import java.util
-
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.common.typeutils.base.BooleanSerializer
 import org.apache.flink.api.table.Row
-import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+;
 
 /**
  * Serializer for [[Row]].
@@ -31,8 +28,6 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
   extends TypeSerializer[Row] {
 
-  private def getFieldSerializers = fieldSerializers
-
   override def isImmutableType: Boolean = false
 
   override def getLength: Int = -1
@@ -79,17 +74,11 @@ class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
 
   override def serialize(value: Row, target: DataOutputView) {
     val len = fieldSerializers.length
-    var index = 0
-    while (index < len) {
-      val o: AnyRef = value.productElement(index).asInstanceOf[AnyRef]
-      if (o == null) {
-        target.writeBoolean(true)
-      } else {
-        target.writeBoolean(false)
-        val serializer = fieldSerializers(index)
-        serializer.serialize(value.productElement(index), target)
-      }
-      index += 1
+    var i = 0
+    while (i < len) {
+      val serializer = fieldSerializers(i)
+      serializer.serialize(value.productElement(i), target)
+      i += 1
     }
   }
 
@@ -100,17 +89,11 @@ class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
       throw new RuntimeException("Row arity of reuse and fields do not match.")
     }
 
-    var index = 0
-    while (index < len) {
-      val isNull: Boolean = source.readBoolean
-      if (isNull) {
-        reuse.setField(index, null)
-      } else {
-        val field = reuse.productElement(index).asInstanceOf[AnyRef]
-        val serializer: TypeSerializer[Any] = fieldSerializers(index)
-        reuse.setField(index, serializer.deserialize(field, source))
-      }
-      index += 1
+    var i = 0
+    while (i < len) {
+      val field = reuse.productElement(i).asInstanceOf[AnyRef]
+      reuse.setField(i, fieldSerializers(i).deserialize(field, source))
+      i += 1
     }
     reuse
   }
@@ -119,17 +102,10 @@ class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
     val len = fieldSerializers.length
 
     val result = new Row(len)
-
-    var index = 0
-    while (index < len) {
-      val isNull: Boolean = source.readBoolean()
-      if (isNull) {
-        result.setField(index, null)
-      } else {
-        val serializer: TypeSerializer[Any] = fieldSerializers(index)
-        result.setField(index, serializer.deserialize(source))
-      }
-      index += 1
+    var i = 0
+    while (i < len) {
+      result.setField(i, fieldSerializers(i).deserialize(source))
+      i += 1
     }
     result
   }
@@ -138,21 +114,8 @@ class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
     val len = fieldSerializers.length
     var i = 0
     while (i < len) {
-      val isNull = source.readBoolean()
-      target.writeBoolean(isNull)
-      if (!isNull) {
-        fieldSerializers(i).copy(source, target)
-      }
+      fieldSerializers(i).copy(source, target)
       i += 1
     }
   }
-
-  override def equals(any: scala.Any): Boolean = {
-    any match {
-      case otherRS: RowSerializer =>
-        val otherFieldSerializers = otherRS.getFieldSerializers.asInstanceOf[Array[AnyRef]]
-        util.Arrays.deepEquals(fieldSerializers.asInstanceOf[Array[AnyRef]], otherFieldSerializers)
-      case _ => false
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bbd08f3c/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
deleted file mode 100644
index cff276a..0000000
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeinfo
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.{SerializerTestInstance, TypeSerializer}
-import org.apache.flink.api.table.Row
-import org.junit.Assert._
-import org.junit.Test
-
-class RowSerializerTest {
-
-  class RowSerializerTestInstance(serializer: TypeSerializer[Row],
-                                  testData: Array[Row])
-    extends SerializerTestInstance(serializer, classOf[Row], -1, testData: _*) {
-
-    override protected def deepEquals(message: String, should: Row, is: Row): Unit = {
-      val arity = should.productArity
-      assertEquals(message, arity, is.productArity)
-      var index = 0
-      while (index < arity) {
-        val copiedValue: Any = should.productElement(index)
-        val element: Any = is.productElement(index)
-        assertEquals(message, element, copiedValue)
-        index += 1
-      }
-    }
-  }
-
-  @Test
-  def testRowSerializer(): Unit ={
-
-    val rowInfo: TypeInformation[Row] = new RowTypeInfo(
-      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name"))
-
-    val row1 = new Row(2)
-    row1.setField(0, 1)
-    row1.setField(1, "a")
-
-    val row2 = new Row(2)
-    row2.setField(0, 2)
-    row2.setField(1, null)
-
-    val testData: Array[Row] = Array(row1, row2)
-
-    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig)
-
-    val testInstance = new RowSerializerTestInstance(rowSerializer,testData)
-
-    testInstance.testAll()
-  }
-
-}