You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/06/18 11:13:52 UTC
[2/2] flink git commit: Revert "[FLINK-2203] handling null values for
RowSerializer"
Revert "[FLINK-2203] handling null values for RowSerializer"
This reverts commit adc3e0e6b1d2013c522e55ee027927488bde09f2.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbd08f3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbd08f3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbd08f3c
Branch: refs/heads/release-0.9
Commit: bbd08f3ceeb277f8e3d27f9ee8572ec712c029c6
Parents: d6ad294
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Jun 18 10:20:18 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Jun 18 10:20:18 2015 +0200
----------------------------------------------------------------------
flink-staging/flink-table/pom.xml | 8 ---
.../api/table/typeinfo/RowSerializer.scala | 73 +++++---------------
.../api/table/typeinfo/RowSerializerTest.scala | 70 -------------------
3 files changed, 18 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bbd08f3c/flink-staging/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/pom.xml b/flink-staging/flink-table/pom.xml
index 20600ff..cbd1c47 100644
--- a/flink-staging/flink-table/pom.xml
+++ b/flink-staging/flink-table/pom.xml
@@ -94,14 +94,6 @@ under the License.
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/bbd08f3c/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
index 527e2b4..8a8dc3d 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
@@ -17,13 +17,10 @@
*/
package org.apache.flink.api.table.typeinfo
-import java.util
-
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.common.typeutils.base.BooleanSerializer
import org.apache.flink.api.table.Row
-import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+;
/**
* Serializer for [[Row]].
@@ -31,8 +28,6 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView}
class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
extends TypeSerializer[Row] {
- private def getFieldSerializers = fieldSerializers
-
override def isImmutableType: Boolean = false
override def getLength: Int = -1
@@ -79,17 +74,11 @@ class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
override def serialize(value: Row, target: DataOutputView) {
val len = fieldSerializers.length
- var index = 0
- while (index < len) {
- val o: AnyRef = value.productElement(index).asInstanceOf[AnyRef]
- if (o == null) {
- target.writeBoolean(true)
- } else {
- target.writeBoolean(false)
- val serializer = fieldSerializers(index)
- serializer.serialize(value.productElement(index), target)
- }
- index += 1
+ var i = 0
+ while (i < len) {
+ val serializer = fieldSerializers(i)
+ serializer.serialize(value.productElement(i), target)
+ i += 1
}
}
@@ -100,17 +89,11 @@ class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
throw new RuntimeException("Row arity of reuse and fields do not match.")
}
- var index = 0
- while (index < len) {
- val isNull: Boolean = source.readBoolean
- if (isNull) {
- reuse.setField(index, null)
- } else {
- val field = reuse.productElement(index).asInstanceOf[AnyRef]
- val serializer: TypeSerializer[Any] = fieldSerializers(index)
- reuse.setField(index, serializer.deserialize(field, source))
- }
- index += 1
+ var i = 0
+ while (i < len) {
+ val field = reuse.productElement(i).asInstanceOf[AnyRef]
+ reuse.setField(i, fieldSerializers(i).deserialize(field, source))
+ i += 1
}
reuse
}
@@ -119,17 +102,10 @@ class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
val len = fieldSerializers.length
val result = new Row(len)
-
- var index = 0
- while (index < len) {
- val isNull: Boolean = source.readBoolean()
- if (isNull) {
- result.setField(index, null)
- } else {
- val serializer: TypeSerializer[Any] = fieldSerializers(index)
- result.setField(index, serializer.deserialize(source))
- }
- index += 1
+ var i = 0
+ while (i < len) {
+ result.setField(i, fieldSerializers(i).deserialize(source))
+ i += 1
}
result
}
@@ -138,21 +114,8 @@ class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
val len = fieldSerializers.length
var i = 0
while (i < len) {
- val isNull = source.readBoolean()
- target.writeBoolean(isNull)
- if (!isNull) {
- fieldSerializers(i).copy(source, target)
- }
+ fieldSerializers(i).copy(source, target)
i += 1
}
}
-
- override def equals(any: scala.Any): Boolean = {
- any match {
- case otherRS: RowSerializer =>
- val otherFieldSerializers = otherRS.getFieldSerializers.asInstanceOf[Array[AnyRef]]
- util.Arrays.deepEquals(fieldSerializers.asInstanceOf[Array[AnyRef]], otherFieldSerializers)
- case _ => false
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbd08f3c/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
deleted file mode 100644
index cff276a..0000000
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeinfo
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.{SerializerTestInstance, TypeSerializer}
-import org.apache.flink.api.table.Row
-import org.junit.Assert._
-import org.junit.Test
-
-class RowSerializerTest {
-
- class RowSerializerTestInstance(serializer: TypeSerializer[Row],
- testData: Array[Row])
- extends SerializerTestInstance(serializer, classOf[Row], -1, testData: _*) {
-
- override protected def deepEquals(message: String, should: Row, is: Row): Unit = {
- val arity = should.productArity
- assertEquals(message, arity, is.productArity)
- var index = 0
- while (index < arity) {
- val copiedValue: Any = should.productElement(index)
- val element: Any = is.productElement(index)
- assertEquals(message, element, copiedValue)
- index += 1
- }
- }
- }
-
- @Test
- def testRowSerializer(): Unit ={
-
- val rowInfo: TypeInformation[Row] = new RowTypeInfo(
- Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name"))
-
- val row1 = new Row(2)
- row1.setField(0, 1)
- row1.setField(1, "a")
-
- val row2 = new Row(2)
- row2.setField(0, 2)
- row2.setField(1, null)
-
- val testData: Array[Row] = Array(row1, row2)
-
- val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig)
-
- val testInstance = new RowSerializerTestInstance(rowSerializer,testData)
-
- testInstance.testAll()
- }
-
-}