You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/15 10:49:56 UTC

[6/7] flink git commit: [FLINK-5189] [table] Delete Row and its related classes from flink-table.

[FLINK-5189] [table] Delete Row and its related classes from flink-table.

This closes #3004.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9e6ec86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9e6ec86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9e6ec86

Branch: refs/heads/master
Commit: a9e6ec863a0879b0c8dea199535380a6d42a3121
Parents: 86f8a25
Author: tonycox <to...@gmail.com>
Authored: Wed Dec 14 12:41:51 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 15 11:36:40 2016 +0100

----------------------------------------------------------------------
 .../scala/org/apache/flink/api/table/Row.scala  |  38 -
 .../table/runtime/io/RowCsvInputFormat.scala    | 181 ----
 .../table/typeutils/NullAwareComparator.scala   | 218 -----
 .../api/table/typeutils/NullMaskUtils.scala     |  98 ---
 .../api/table/typeutils/RowComparator.scala     | 425 ---------
 .../api/table/typeutils/RowSerializer.scala     | 209 -----
 .../flink/api/table/typeutils/RowTypeInfo.scala | 108 ---
 .../runtime/io/RowCsvInputFormatTest.scala      | 882 -------------------
 .../api/table/typeutils/RowComparatorTest.scala | 136 ---
 .../RowComparatorWithManyFieldsTest.scala       |  82 --
 .../api/table/typeutils/RowSerializerTest.scala | 194 ----
 11 files changed, 2571 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
deleted file mode 100644
index e3baab3..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * This is used for executing Table API operations. We use manually generated
- * TypeInfo to check the field types and create serializers and comparators.
- */
-class Row(arity: Int) extends Product {
-
-  private val fields = new Array[Any](arity)
-
-  def productArity = fields.length
-
-  def productElement(i: Int): Any = fields(i)
-
-  def setField(i: Int, value: Any): Unit = fields(i) = value
-
-  def canEqual(that: Any) = false
-
-  override def toString = fields.mkString(",")
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
deleted file mode 100644
index b0ab801..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.io
-
-import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.io.ParseException
-import org.apache.flink.api.java.io.CsvInputFormat
-import org.apache.flink.api.java.io.CsvInputFormat.{DEFAULT_FIELD_DELIMITER, DEFAULT_LINE_DELIMITER, createDefaultMask, toBooleanMask}
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.runtime.io.RowCsvInputFormat.extractTypeClasses
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.core.fs.Path
-import org.apache.flink.types.parser.FieldParser
-import org.apache.flink.types.parser.FieldParser.ParseErrorState
-
-@Internal
-@SerialVersionUID(1L)
-class RowCsvInputFormat(
-    filePath: Path,
-    rowTypeInfo: RowTypeInfo,
-    lineDelimiter: String = DEFAULT_LINE_DELIMITER,
-    fieldDelimiter: String = DEFAULT_FIELD_DELIMITER,
-    includedFieldsMask: Array[Boolean] = null,
-    emptyColumnAsNull: Boolean = false)
-  extends CsvInputFormat[Row](filePath) {
-
-  if (rowTypeInfo.getArity == 0) {
-    throw new IllegalArgumentException("Row arity must be greater than 0.")
-  }
-  private val arity = rowTypeInfo.getArity
-  private lazy val defaultFieldMask = createDefaultMask(arity)
-  private val fieldsMask = Option(includedFieldsMask).getOrElse(defaultFieldMask)
-
-  // prepare CsvInputFormat
-  setDelimiter(lineDelimiter)
-  setFieldDelimiter(fieldDelimiter)
-  setFieldsGeneric(fieldsMask, extractTypeClasses(rowTypeInfo))
-
-  def this(
-      filePath: Path,
-      rowTypeInfo: RowTypeInfo,
-      lineDelimiter: String,
-      fieldDelimiter: String,
-      includedFieldsMask: Array[Int]) {
-    this(
-      filePath,
-      rowTypeInfo,
-      lineDelimiter,
-      fieldDelimiter,
-      if (includedFieldsMask == null) {
-        null
-      } else {
-        toBooleanMask(includedFieldsMask)
-      })
-  }
-
-  def this(
-      filePath: Path,
-      rowTypeInfo: RowTypeInfo,
-      includedFieldsMask: Array[Int]) {
-    this(
-      filePath,
-      rowTypeInfo,
-      DEFAULT_LINE_DELIMITER,
-      DEFAULT_FIELD_DELIMITER,
-      includedFieldsMask)
-  }
-
-  def fillRecord(reuse: Row, parsedValues: Array[AnyRef]): Row = {
-    val reuseRow = if (reuse == null) {
-      new Row(arity)
-    } else {
-      reuse
-    }
-    var i: Int = 0
-    while (i < parsedValues.length) {
-      reuse.setField(i, parsedValues(i))
-      i += 1
-    }
-    reuseRow
-  }
-
-  @throws[ParseException]
-  override protected def parseRecord(
-      holders: Array[AnyRef],
-      bytes: Array[Byte],
-      offset: Int,
-      numBytes: Int)
-    : Boolean = {
-    val fieldDelimiter = this.getFieldDelimiter
-    val fieldIncluded: Array[Boolean] = this.fieldIncluded
-
-    var startPos = offset
-    val limit = offset + numBytes
-
-    var field = 0
-    var output = 0
-    while (field < fieldIncluded.length) {
-
-      // check valid start position
-      if (startPos >= limit) {
-        if (isLenient) {
-          return false
-        } else {
-          throw new ParseException("Row too short: " + new String(bytes, offset, numBytes))
-        }
-      }
-
-      if (fieldIncluded(field)) {
-        // parse field
-        val parser: FieldParser[AnyRef] = this.getFieldParsers()(output)
-          .asInstanceOf[FieldParser[AnyRef]]
-        val latestValidPos = startPos
-        startPos = parser.resetErrorStateAndParse(
-          bytes,
-          startPos,
-          limit,
-          fieldDelimiter,
-          holders(output))
-
-        if (!isLenient && (parser.getErrorState ne ParseErrorState.NONE)) {
-          // the error state EMPTY_COLUMN is ignored
-          if (parser.getErrorState ne ParseErrorState.EMPTY_COLUMN) {
-            throw new ParseException(s"Parsing error for column $field of row '"
-              + new String(bytes, offset, numBytes)
-              + s"' originated by ${parser.getClass.getSimpleName}: ${parser.getErrorState}.")
-          }
-        }
-        holders(output) = parser.getLastResult
-
-        // check parse result:
-        // the result is null if it is invalid
-        // or empty with emptyColumnAsNull enabled
-        if (startPos < 0 ||
-            (emptyColumnAsNull && (parser.getErrorState eq ParseErrorState.EMPTY_COLUMN))) {
-          holders(output) = null
-          startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter)
-        }
-        output += 1
-      } else {
-        // skip field
-        startPos = skipFields(bytes, startPos, limit, fieldDelimiter)
-      }
-
-      // check if something went wrong
-      if (startPos < 0) {
-        throw new ParseException(s"Unexpected parser position for column $field of row '"
-          + new String(bytes, offset, numBytes) + "'")
-      }
-
-      field += 1
-    }
-    true
-  }
-}
-
-object RowCsvInputFormat {
-
-  private def extractTypeClasses(rowTypeInfo: RowTypeInfo): Array[Class[_]] = {
-    val classes = for (i <- 0 until rowTypeInfo.getArity)
-      yield rowTypeInfo.getTypeAt(i).getTypeClass
-    classes.toArray
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullAwareComparator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullAwareComparator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullAwareComparator.scala
deleted file mode 100644
index 86a768d..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullAwareComparator.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, TypeComparator}
-import org.apache.flink.core.memory.{DataInputView, DataOutputView, MemorySegment}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Null-aware comparator that wraps a comparator which does not support null references.
- *
- * NOTE: This class assumes to be used within a composite type comparator (such
- * as [[RowComparator]]) that handles serialized comparison.
- */
-class NullAwareComparator[T](
-    val wrappedComparator: TypeComparator[T],
-    val order: Boolean)
-  extends TypeComparator[T] {
-
-  // number of flat fields
-  private val flatFields = wrappedComparator.getFlatComparators.length
-
-  // stores the null for reference comparison
-  private var nullReference = false
-
-  override def hash(record: T): Int = {
-    if (record != null) {
-      wrappedComparator.hash(record)
-    }
-    else {
-      0
-    }
-  }
-
- override def getNormalizeKeyLen: Int = {
-    val len = wrappedComparator.getNormalizeKeyLen
-    if (len == Integer.MAX_VALUE) {
-      Integer.MAX_VALUE
-    }
-    else {
-      len + 1 // add one for a null byte
-    }
-  }
-
-  override def putNormalizedKey(
-      record: T,
-      target: MemorySegment,
-      offset: Int,
-      numBytes: Int)
-    : Unit = {
-    if (numBytes > 0) {
-      // write a null byte with padding
-      if (record == null) {
-        target.putBoolean(offset, false)
-        // write padding
-        var j = 0
-        while (j < numBytes - 1) {
-          target.put(offset + 1 + j, 0.toByte)
-          j += 1
-        }
-      }
-      // write a non-null byte with key
-      else {
-        target.putBoolean(offset, true)
-        // write key
-        wrappedComparator.putNormalizedKey(record, target, offset + 1, numBytes - 1)
-      }
-    }
-  }
-
-  override def invertNormalizedKey(): Boolean = wrappedComparator.invertNormalizedKey()
-
-  override def supportsSerializationWithKeyNormalization(): Boolean = false
-
-  override def writeWithKeyNormalization(record: T, target: DataOutputView): Unit =
-    throw new UnsupportedOperationException("Record serialization with leading normalized keys" +
-      " not supported.")
-
-  override def readWithKeyDenormalization(reuse: T, source: DataInputView): T =
-    throw new UnsupportedOperationException("Record deserialization with leading normalized keys" +
-      " not supported.")
-
-  override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean =
-    wrappedComparator.isNormalizedKeyPrefixOnly(keyBytes - 1)
-
-  override def setReference(toCompare: T): Unit = {
-    if (toCompare == null) {
-      nullReference = true
-    }
-    else {
-      nullReference = false
-      wrappedComparator.setReference(toCompare)
-    }
-  }
-
-  override def compare(first: T, second: T): Int = {
-    // both values are null -> equality
-    if (first == null && second == null) {
-      0
-    }
-    // first value is null -> inequality
-    // but order is considered
-    else if (first == null) {
-      if (order) -1 else 1
-    }
-    // second value is null -> inequality
-    // but order is considered
-    else if (second == null) {
-      if (order) 1 else -1
-    }
-    // no null values
-    else {
-      wrappedComparator.compare(first, second)
-    }
-  }
-
-  override def compareToReference(referencedComparator: TypeComparator[T]): Int = {
-    val otherComparator = referencedComparator.asInstanceOf[NullAwareComparator[T]]
-    val otherNullReference = otherComparator.nullReference
-    // both values are null -> equality
-    if (nullReference && otherNullReference) {
-      0
-    }
-    // first value is null -> inequality
-    // but order is considered
-    else if (nullReference) {
-      if (order) 1 else -1
-    }
-    // second value is null -> inequality
-    // but order is considered
-    else if (otherNullReference) {
-      if (order) -1 else 1
-    }
-    // no null values
-    else {
-      wrappedComparator.compareToReference(otherComparator.wrappedComparator)
-    }
-  }
-
-  override def supportsNormalizedKey(): Boolean = wrappedComparator.supportsNormalizedKey()
-
-  override def equalToReference(candidate: T): Boolean = {
-    // both values are null
-    if (candidate == null && nullReference) {
-      true
-    }
-    // one value is null
-    else if (candidate == null || nullReference) {
-      false
-    }
-    // no null value
-    else {
-      wrappedComparator.equalToReference(candidate)
-    }
-  }
-
-  override def duplicate(): TypeComparator[T] = {
-    new NullAwareComparator[T](wrappedComparator.duplicate(), order)
-  }
-
-  override def extractKeys(record: Any, target: Array[AnyRef], index: Int): Int = {
-    if (record == null) {
-      var i = 0
-      while (i < flatFields) {
-        target(index + i) = null
-        i += 1
-      }
-      flatFields
-    }
-    else {
-      wrappedComparator.extractKeys(record, target, index)
-    }
-  }
-
-
-  override def getFlatComparators: Array[TypeComparator[_]] = {
-    // determine the flat comparators and wrap them again in null-aware comparators
-    val flatComparators = new ArrayBuffer[TypeComparator[_]]()
-    wrappedComparator match {
-      case ctc: CompositeTypeComparator[_] => ctc.getFlatComparator(flatComparators)
-      case c: TypeComparator[_] => flatComparators += c
-    }
-    val wrappedComparators = flatComparators.map { c =>
-      new NullAwareComparator[Any](c.asInstanceOf[TypeComparator[Any]], order)
-    }
-    wrappedComparators.toArray[TypeComparator[_]]
-  }
-
-  /**
-   * This method is not implemented here. It must be implemented by the comparator this class
-   * is contained in (e.g. RowComparator).
-   *
-   * @param firstSource The input view containing the first record.
-   * @param secondSource The input view containing the second record.
-   * @return An integer defining the oder among the objects in the same way as
-   *         { @link java.util.Comparator#compare(Object, Object)}.
-   */
-  override def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int =
-    throw new UnsupportedOperationException("Comparator does not support null-aware serialized " +
-      "comparision.")
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullMaskUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullMaskUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullMaskUtils.scala
deleted file mode 100644
index dcdc775..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullMaskUtils.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.table.Row
-import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-
-object NullMaskUtils {
-
-  def writeNullMask(len: Int, value: Row, target: DataOutputView): Unit = {
-    var b = 0x00
-    var bytePos = 0
-
-    var fieldPos = 0
-    var numPos = 0
-    while (fieldPos < len) {
-      b = 0x00
-      // set bits in byte
-      bytePos = 0
-      numPos = Math.min(8, len - fieldPos)
-      while (bytePos < numPos) {
-        b = b << 1
-        // set bit if field is null
-        if(value.productElement(fieldPos + bytePos) == null) {
-          b |= 0x01
-        }
-        bytePos += 1
-      }
-      fieldPos += numPos
-      // shift bits if last byte is not completely filled
-      b <<= (8 - bytePos)
-      // write byte
-      target.writeByte(b)
-    }
-  }
-
-  def readIntoNullMask(len: Int, source: DataInputView, nullMask: Array[Boolean]): Unit = {
-    var b = 0x00
-    var bytePos = 0
-
-    var fieldPos = 0
-    var numPos = 0
-    while (fieldPos < len) {
-      // read byte
-      b = source.readUnsignedByte()
-      bytePos = 0
-      numPos = Math.min(8, len - fieldPos)
-      while (bytePos < numPos) {
-        nullMask(fieldPos + bytePos) = (b & 0x80) > 0
-        b = b << 1
-        bytePos += 1
-      }
-      fieldPos += numPos
-    }
-  }
-
-  def readIntoAndCopyNullMask(
-      len: Int,
-      source: DataInputView,
-      target: DataOutputView,
-      nullMask: Array[Boolean]): Unit = {
-    var b = 0x00
-    var bytePos = 0
-
-    var fieldPos = 0
-    var numPos = 0
-    while (fieldPos < len) {
-      // read byte
-      b = source.readUnsignedByte()
-      // copy byte
-      target.writeByte(b)
-      bytePos = 0
-      numPos = Math.min(8, len - fieldPos)
-      while (bytePos < numPos) {
-        nullMask(fieldPos + bytePos) = (b & 0x80) > 0
-        b = b << 1
-        bytePos += 1
-      }
-      fieldPos += numPos
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala
deleted file mode 100644
index 8bbe4d8..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import java.util
-
-import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, TypeComparator, TypeSerializer}
-import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.typeutils.NullMaskUtils.readIntoNullMask
-import org.apache.flink.api.table.typeutils.RowComparator.{createAuxiliaryFields, makeNullAware}
-import org.apache.flink.core.memory.{DataInputView, DataOutputView, MemorySegment}
-import org.apache.flink.types.KeyFieldOutOfBoundsException
-
-/**
- * Comparator for [[Row]].
- */
-class RowComparator private (
-    /** the number of fields of the Row */
-    val numberOfFields: Int,
-    /** key positions describe which fields are keys in what order */
-    val keyPositions: Array[Int],
-    /** null-aware comparators for the key fields, in the same order as the key fields */
-    val comparators: Array[NullAwareComparator[Any]],
-    /** serializers to deserialize the first n fields for comparison */
-    val serializers: Array[TypeSerializer[Any]],
-    /** auxiliary fields for normalized key support */
-    private val auxiliaryFields: (Array[Int], Int, Int, Boolean))
-  extends CompositeTypeComparator[Row] with Serializable {
-
-  // null masks for serialized comparison
-  private val nullMask1 = new Array[Boolean](numberOfFields)
-  private val nullMask2 = new Array[Boolean](numberOfFields)
-
-  // cache for the deserialized key field objects
-  @transient
-  private lazy val deserializedKeyFields1: Array[Any] = instantiateDeserializationFields()
-
-  @transient
-  private lazy val deserializedKeyFields2: Array[Any] = instantiateDeserializationFields()
-
-  // create auxiliary fields
-  private val normalizedKeyLengths: Array[Int] = auxiliaryFields._1
-  private val numLeadingNormalizableKeys: Int = auxiliaryFields._2
-  private val normalizableKeyPrefixLen: Int = auxiliaryFields._3
-  private val invertNormKey: Boolean = auxiliaryFields._4
-
-  /**
-   * Intermediate constructor for creating auxiliary fields.
-   */
-  def this(
-      numberOfFields: Int,
-      keyPositions: Array[Int],
-      comparators: Array[NullAwareComparator[Any]],
-      serializers: Array[TypeSerializer[Any]]) = {
-    this(
-      numberOfFields,
-      keyPositions,
-      comparators,
-      serializers,
-      createAuxiliaryFields(keyPositions, comparators))
-  }
-
-  /**
-   * General constructor for RowComparator.
-   *
-   * @param numberOfFields the number of fields of the Row
-   * @param keyPositions key positions describe which fields are keys in what order
-   * @param comparators non-null-aware comparators for the key fields, in the same order as
-   *   the key fields
-   * @param serializers serializers to deserialize the first n fields for comparison
-   * @param orders sorting orders for the fields
-   */
-  def this(
-      numberOfFields: Int,
-      keyPositions: Array[Int],
-      comparators: Array[TypeComparator[Any]],
-      serializers: Array[TypeSerializer[Any]],
-      orders: Array[Boolean]) = {
-    this(
-      numberOfFields,
-      keyPositions,
-      makeNullAware(comparators, orders),
-      serializers)
-  }
-
-  private def instantiateDeserializationFields(): Array[Any] = {
-    val newFields = new Array[Any](serializers.length)
-    var i = 0
-    while (i < serializers.length) {
-      newFields(i) = serializers(i).createInstance()
-      i += 1
-    }
-    newFields
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Comparator Methods
-  // --------------------------------------------------------------------------------------------
-
-  override def compareToReference(referencedComparator: TypeComparator[Row]): Int = {
-    val other: RowComparator = referencedComparator.asInstanceOf[RowComparator]
-    var i = 0
-    try {
-      while (i < keyPositions.length) {
-        val comparator = comparators(i)
-        val otherComparator = other.comparators(i)
-
-        val cmp = comparator.compareToReference(otherComparator)
-        if (cmp != 0) {
-          return cmp
-        }
-        i = i + 1
-      }
-      0
-    }
-    catch {
-      case iobex: IndexOutOfBoundsException =>
-        throw new KeyFieldOutOfBoundsException(keyPositions(i))
-    }
-  }
-
-  override def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = {
-    val len = serializers.length
-    val keyLen = keyPositions.length
-
-    readIntoNullMask(numberOfFields, firstSource, nullMask1)
-    readIntoNullMask(numberOfFields, secondSource, nullMask2)
-
-    // deserialize
-    var i = 0
-    while (i < len) {
-      val serializer = serializers(i)
-
-      // deserialize field 1
-      if (!nullMask1(i)) {
-        deserializedKeyFields1(i) = serializer.deserialize(deserializedKeyFields1(i), firstSource)
-      }
-
-      // deserialize field 2
-      if (!nullMask2(i)) {
-        deserializedKeyFields2(i) = serializer.deserialize(deserializedKeyFields2(i), secondSource)
-      }
-
-      i += 1
-    }
-
-    // compare
-    i = 0
-    while (i < keyLen) {
-      val keyPos = keyPositions(i)
-      val comparator = comparators(i)
-
-      val isNull1 = nullMask1(keyPos)
-      val isNull2 = nullMask2(keyPos)
-
-      var cmp = 0
-      // both values are null -> equality
-      if (isNull1 && isNull2) {
-        cmp = 0
-      }
-      // first value is null -> inequality
-      else if (isNull1) {
-        cmp = comparator.compare(null, deserializedKeyFields2(keyPos))
-      }
-      // second value is null -> inequality
-      else if (isNull2) {
-        cmp = comparator.compare(deserializedKeyFields1(keyPos), null)
-      }
-      // no null values
-      else {
-        cmp = comparator.compare(deserializedKeyFields1(keyPos), deserializedKeyFields2(keyPos))
-      }
-
-      if (cmp != 0) {
-        return cmp
-      }
-
-      i += 1
-    }
-    0
-  }
-
-  override def supportsNormalizedKey(): Boolean = numLeadingNormalizableKeys > 0
-
-  override def getNormalizeKeyLen: Int = normalizableKeyPrefixLen
-
-  override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean =
-    numLeadingNormalizableKeys < keyPositions.length ||
-      normalizableKeyPrefixLen == Integer.MAX_VALUE ||
-      normalizableKeyPrefixLen > keyBytes
-
-  override def invertNormalizedKey(): Boolean = invertNormKey
-
-  override def supportsSerializationWithKeyNormalization(): Boolean = false
-
-  override def writeWithKeyNormalization(record: Row, target: DataOutputView): Unit =
-    throw new UnsupportedOperationException("Record serialization with leading normalized keys " +
-      "not supported.")
-
-  override def readWithKeyDenormalization(reuse: Row, source: DataInputView): Row =
-    throw new UnsupportedOperationException("Record deserialization with leading normalized keys " +
-      "not supported.")
-
-  override def duplicate(): TypeComparator[Row] = {
-    // copy comparator and serializer factories
-    val comparatorsCopy = comparators.map(_.duplicate().asInstanceOf[NullAwareComparator[Any]])
-    val serializersCopy = serializers.map(_.duplicate())
-
-    new RowComparator(
-      numberOfFields,
-      keyPositions,
-      comparatorsCopy,
-      serializersCopy,
-      auxiliaryFields)
-  }
-
-  override def hash(value: Row): Int = {
-    var code: Int = 0
-    var i = 0
-    try {
-      while(i < keyPositions.length) {
-        code *= TupleComparatorBase.HASH_SALT(i & 0x1F)
-        val element = value.productElement(keyPositions(i)) // element can be null
-        code += comparators(i).hash(element)
-        i += 1
-      }
-    } catch {
-      case iobex: IndexOutOfBoundsException =>
-        throw new KeyFieldOutOfBoundsException(keyPositions(i))
-    }
-    code
-  }
-
-  override def setReference(toCompare: Row) {
-    var i = 0
-    try {
-      while(i < keyPositions.length) {
-        val comparator = comparators(i)
-        val element = toCompare.productElement(keyPositions(i))
-        comparator.setReference(element) // element can be null
-        i += 1
-      }
-    } catch {
-      case iobex: IndexOutOfBoundsException =>
-        throw new KeyFieldOutOfBoundsException(keyPositions(i))
-    }
-  }
-
-  override def equalToReference(candidate: Row): Boolean = {
-    var i = 0
-    try {
-      while(i < keyPositions.length) {
-        val comparator = comparators(i)
-        val element = candidate.productElement(keyPositions(i)) // element can be null
-        // check if reference is not equal
-        if (!comparator.equalToReference(element)) {
-          return false
-        }
-        i += 1
-      }
-    } catch {
-      case iobex: IndexOutOfBoundsException =>
-        throw new KeyFieldOutOfBoundsException(keyPositions(i))
-    }
-    true
-  }
-
-  override def compare(first: Row, second: Row): Int = {
-    var i = 0
-    try {
-      while(i < keyPositions.length) {
-        val keyPos: Int = keyPositions(i)
-        val comparator = comparators(i)
-        val firstElement = first.productElement(keyPos) // element can be null
-        val secondElement = second.productElement(keyPos) // element can be null
-
-        val cmp = comparator.compare(firstElement, secondElement)
-        if (cmp != 0) {
-          return cmp
-        }
-        i += 1
-      }
-    } catch {
-      case iobex: IndexOutOfBoundsException =>
-        throw new KeyFieldOutOfBoundsException(keyPositions(i))
-    }
-    0
-  }
-
-  override def putNormalizedKey(
-      record: Row,
-      target: MemorySegment,
-      offset: Int,
-      numBytes: Int)
-    : Unit = {
-    var bytesLeft = numBytes
-    var currentOffset = offset
-
-    var i = 0
-    while (i < numLeadingNormalizableKeys && bytesLeft > 0) {
-      var len = normalizedKeyLengths(i)
-      len = if (bytesLeft >= len) len else bytesLeft
-
-      val comparator = comparators(i)
-      val element = record.productElement(keyPositions(i)) // element can be null
-      // write key
-      comparator.putNormalizedKey(element, target, currentOffset, len)
-
-      bytesLeft -= len
-      currentOffset += len
-      i += 1
-    }
-  }
-
-  override def getFlatComparator(flatComparators: util.List[TypeComparator[_]]): Unit =
-    comparators.foreach { c =>
-      c.getFlatComparators.foreach { fc =>
-        flatComparators.add(fc)
-      }
-    }
-
-  override def extractKeys(record: Any, target: Array[AnyRef], index: Int): Int = {
-    val len = comparators.length
-    var localIndex = index
-    var i = 0
-    while (i < len) {
-      val element = record.asInstanceOf[Row].productElement(keyPositions(i)) // element can be null
-      localIndex += comparators(i).extractKeys(element, target, localIndex)
-      i += 1
-    }
-    localIndex - index
-  }
-}
-
-object RowComparator {
-  private def makeNullAware(
-      comparators: Array[TypeComparator[Any]],
-      orders: Array[Boolean])
-    : Array[NullAwareComparator[Any]] =
-    comparators
-      .zip(orders)
-      .map { case (comp, order) =>
-        new NullAwareComparator[Any](
-          comp,
-          order)
-      }
-
-  /**
-   * @return creates auxiliary fields for normalized key support
-   */
-  private def createAuxiliaryFields(
-      keyPositions: Array[Int],
-      comparators: Array[NullAwareComparator[Any]])
-    : (Array[Int], Int, Int, Boolean) = {
-
-    val normalizedKeyLengths = new Array[Int](keyPositions.length)
-    var numLeadingNormalizableKeys = 0
-    var normalizableKeyPrefixLen = 0
-    var inverted = false
-
-    var i = 0
-    while (i < keyPositions.length) {
-      val k = comparators(i)
-      // as long as the leading keys support normalized keys, we can build up the composite key
-      if (k.supportsNormalizedKey()) {
-        if (i == 0) {
-          // the first comparator decides whether we need to invert the key direction
-          inverted = k.invertNormalizedKey()
-        }
-        else if (k.invertNormalizedKey() != inverted) {
-          // if a successor does not agree on the inversion direction, it cannot be part of the
-          // normalized key
-          return (normalizedKeyLengths,
-            numLeadingNormalizableKeys,
-            normalizableKeyPrefixLen,
-            inverted)
-        }
-        numLeadingNormalizableKeys += 1
-        val len = k.getNormalizeKeyLen
-        if (len < 0) {
-          throw new RuntimeException("Comparator " + k.getClass.getName +
-            " specifies an invalid length for the normalized key: " + len)
-        }
-        normalizedKeyLengths(i) = len
-        normalizableKeyPrefixLen += len
-        if (normalizableKeyPrefixLen < 0) {
-          // overflow, which means we are out of budget for normalized key space anyways
-          return (normalizedKeyLengths,
-            numLeadingNormalizableKeys,
-            Integer.MAX_VALUE,
-            inverted)
-        }
-      }
-      else {
-        return (normalizedKeyLengths,
-          numLeadingNormalizableKeys,
-          normalizableKeyPrefixLen,
-          inverted)
-      }
-      i += 1
-    }
-    (normalizedKeyLengths,
-      numLeadingNormalizableKeys,
-      normalizableKeyPrefixLen,
-      inverted)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowSerializer.scala
deleted file mode 100644
index 825a99c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowSerializer.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.typeutils.NullMaskUtils.{writeNullMask, readIntoNullMask, readIntoAndCopyNullMask}
-import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-
-/**
- * Serializer for [[Row]].
- */
-class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
-  extends TypeSerializer[Row] {
-
-  private val nullMask = new Array[Boolean](fieldSerializers.length)
-
-  override def isImmutableType: Boolean = false
-
-  override def getLength: Int = -1
-
-  override def duplicate: RowSerializer = {
-    val duplicateFieldSerializers = fieldSerializers.map(_.duplicate())
-    new RowSerializer(duplicateFieldSerializers)
-  }
-
-  override def createInstance: Row = {
-    new Row(fieldSerializers.length)
-  }
-
-  override def copy(from: Row, reuse: Row): Row = {
-    val len = fieldSerializers.length
-
-    // cannot reuse, do a non-reuse copy
-    if (reuse == null) {
-      return copy(from)
-    }
-
-    if (from.productArity != len || reuse.productArity != len) {
-      throw new RuntimeException("Row arity of reuse or from is incompatible with this " +
-        "RowSerializer.")
-    }
-
-    var i = 0
-    while (i < len) {
-      val fromField = from.productElement(i)
-      if (fromField != null) {
-        val reuseField = reuse.productElement(i)
-        if (reuseField != null) {
-          val copy = fieldSerializers(i).copy(fromField, reuseField)
-          reuse.setField(i, copy)
-        }
-        else {
-          val copy = fieldSerializers(i).copy(fromField)
-          reuse.setField(i, copy)
-        }
-      }
-      else {
-        reuse.setField(i, null)
-      }
-      i += 1
-    }
-    reuse
-  }
-
-  override def copy(from: Row): Row = {
-    val len = fieldSerializers.length
-
-    if (from.productArity != len) {
-      throw new RuntimeException("Row arity of from does not match serializers.")
-    }
-    val result = new Row(len)
-    var i = 0
-    while (i < len) {
-      val fromField = from.productElement(i).asInstanceOf[AnyRef]
-      if (fromField != null) {
-        val copy = fieldSerializers(i).copy(fromField)
-        result.setField(i, copy)
-      }
-      else {
-        result.setField(i, null)
-      }
-      i += 1
-    }
-    result
-  }
-
-  override def serialize(value: Row, target: DataOutputView) {
-    val len = fieldSerializers.length
-
-    if (value.productArity != len) {
-      throw new RuntimeException("Row arity of value does not match serializers.")
-    }
-
-    // write a null mask
-    writeNullMask(len, value, target)
-
-    // serialize non-null fields
-    var i = 0
-    while (i < len) {
-      val o = value.productElement(i).asInstanceOf[AnyRef]
-      if (o != null) {
-        val serializer = fieldSerializers(i)
-        serializer.serialize(value.productElement(i), target)
-      }
-      i += 1
-    }
-  }
-
-  override def deserialize(reuse: Row, source: DataInputView): Row = {
-    val len = fieldSerializers.length
-
-    if (reuse.productArity != len) {
-      throw new RuntimeException("Row arity of reuse does not match serializers.")
-    }
-
-    // read null mask
-    readIntoNullMask(len, source, nullMask)
-
-    // read non-null fields
-    var i = 0
-    while (i < len) {
-      if (nullMask(i)) {
-        reuse.setField(i, null)
-      }
-      else {
-        val reuseField = reuse.productElement(i).asInstanceOf[AnyRef]
-        if (reuseField != null) {
-          reuse.setField(i, fieldSerializers(i).deserialize(reuseField, source))
-        }
-        else {
-          reuse.setField(i, fieldSerializers(i).deserialize(source))
-        }
-      }
-      i += 1
-    }
-    reuse
-  }
-
-  override def deserialize(source: DataInputView): Row = {
-    val len = fieldSerializers.length
-
-    val result = new Row(len)
-
-    // read null mask
-    readIntoNullMask(len, source, nullMask)
-
-    // read non-null fields
-    var i = 0
-    while (i < len) {
-      if (nullMask(i)) {
-        result.setField(i, null)
-      }
-      else {
-        result.setField(i, fieldSerializers(i).deserialize(source))
-      }
-      i += 1
-    }
-    result
-  }
-
-  override def copy(source: DataInputView, target: DataOutputView): Unit = {
-    val len = fieldSerializers.length
-
-    // copy null mask
-    readIntoAndCopyNullMask(len, source, target, nullMask)
-
-    // read non-null fields
-    var i = 0
-    while (i < len) {
-      if (!nullMask(i)) {
-        fieldSerializers(i).copy(source, target)
-      }
-      i += 1
-    }
-  }
-
-  override def equals(any: Any): Boolean = {
-    any match {
-      case otherRS: RowSerializer =>
-        otherRS.canEqual(this) &&
-          fieldSerializers.sameElements(otherRS.fieldSerializers)
-      case _ => false
-    }
-  }
-
-  override def canEqual(obj: AnyRef): Boolean = {
-    obj.isInstanceOf[RowSerializer]
-  }
-
-  override def hashCode(): Int = {
-    java.util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]])
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
deleted file mode 100644
index 711bb49..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType.TypeComparatorBuilder
-import org.apache.flink.api.common.typeutils.TypeComparator
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-
-import scala.collection.mutable.ArrayBuffer
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.table.Row
-
-/**
- * TypeInformation for [[Row]].
- */
-class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]])
-  extends CaseClassTypeInfo[Row](
-    classOf[Row],
-    Array(),
-    fieldTypes,
-    for (i <- fieldTypes.indices) yield "f" + i)
-{
-
-  def this(fieldTypes: Array[TypeInformation[_]]) = {
-    this(fieldTypes.toSeq)
-  }
-
-  /**
-   * Temporary variable for directly passing orders to comparators.
-   */
-  var comparatorOrders: Option[Array[Boolean]] = None
-
-  override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Row] = {
-    val fieldSerializers: Array[TypeSerializer[Any]] = new Array[TypeSerializer[Any]](getArity)
-    for (i <- 0 until getArity) {
-      fieldSerializers(i) = this.types(i).createSerializer(executionConfig)
-        .asInstanceOf[TypeSerializer[Any]]
-    }
-
-    new RowSerializer(fieldSerializers)
-  }
-
-  override def createComparator(
-      logicalKeyFields: Array[Int],
-      orders: Array[Boolean],
-      logicalFieldOffset: Int,
-      config: ExecutionConfig)
-    : TypeComparator[Row] = {
-    // store the order information for the builder
-    comparatorOrders = Some(orders)
-    val comparator = super.createComparator(logicalKeyFields, orders, logicalFieldOffset, config)
-    comparatorOrders = None
-    comparator
-  }
-
-  override def createTypeComparatorBuilder(): TypeComparatorBuilder[Row] = {
-    new RowTypeComparatorBuilder(comparatorOrders.getOrElse(
-      throw new IllegalStateException("Cannot create comparator builder without orders.")))
-  }
-
-  private class RowTypeComparatorBuilder(
-      comparatorOrders: Array[Boolean])
-    extends TypeComparatorBuilder[Row] {
-
-    val fieldComparators: ArrayBuffer[TypeComparator[_]] = new ArrayBuffer[TypeComparator[_]]()
-    val logicalKeyFields: ArrayBuffer[Int] = new ArrayBuffer[Int]()
-
-    override def initializeTypeComparatorBuilder(size: Int): Unit = {
-      fieldComparators.sizeHint(size)
-      logicalKeyFields.sizeHint(size)
-    }
-
-    override def addComparatorField(fieldId: Int, comparator: TypeComparator[_]): Unit = {
-      fieldComparators += comparator
-      logicalKeyFields += fieldId
-    }
-
-    override def createTypeComparator(config: ExecutionConfig): TypeComparator[Row] = {
-      val maxIndex = logicalKeyFields.max
-
-      new RowComparator(
-        getArity,
-        logicalKeyFields.toArray,
-        fieldComparators.toArray.asInstanceOf[Array[TypeComparator[Any]]],
-        types.take(maxIndex + 1).map(_.createSerializer(config).asInstanceOf[TypeSerializer[Any]]),
-        comparatorOrders
-      )
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
deleted file mode 100644
index d72e7a8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
+++ /dev/null
@@ -1,882 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.io
-
-import java.io.{File, FileOutputStream, OutputStreamWriter}
-import java.nio.charset.StandardCharsets
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.common.io.ParseException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.runtime.io.RowCsvInputFormatTest.{PATH, createTempFile, testRemovingTrailingCR}
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.fs.{FileInputSplit, Path}
-import org.apache.flink.types.parser.{FieldParser, StringParser}
-import org.junit.Assert._
-import org.junit.{Ignore, Test}
-
-class RowCsvInputFormatTest {
-
-  @Test
-  def ignoreInvalidLines() {
-    val fileContent =
-      "#description of the data\n" +
-        "header1|header2|header3|\n" +
-        "this is|1|2.0|\n" +
-        "//a comment\n" +
-        "a test|3|4.0|\n" +
-        "#next|5|6.0|\n"
-
-    val split = createTempFile(fileContent)
-
-    val typeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO))
-
-    val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
-    format.setLenient(false)
-    val parameters = new Configuration
-    format.configure(parameters)
-    format.open(split)
-
-    var result = new Row(3)
-    try {
-      result = format.nextRecord(result)
-      fail("Parse Exception was not thrown! (Row too short)")
-    }
-    catch {
-      case ex: ParseException =>  // ok
-    }
-
-    try {
-      result = format.nextRecord(result)
-      fail("Parse Exception was not thrown! (Invalid int value)")
-    }
-    catch {
-      case ex: ParseException => // ok
-    }
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("this is", result.productElement(0))
-    assertEquals(1, result.productElement(1))
-    assertEquals(2.0, result.productElement(2))
-
-    try {
-      result = format.nextRecord(result)
-      fail("Parse Exception was not thrown! (Row too short)")
-    }
-    catch {
-      case ex: ParseException => // ok
-    }
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("a test", result.productElement(0))
-    assertEquals(3, result.productElement(1))
-    assertEquals(4.0, result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("#next", result.productElement(0))
-    assertEquals(5, result.productElement(1))
-    assertEquals(6.0, result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNull(result)
-
-    // re-open with lenient = true
-    format.setLenient(true)
-    format.configure(parameters)
-    format.open(split)
-
-    result = new Row(3)
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("header1", result.productElement(0))
-    assertNull(result.productElement(1))
-    assertNull(result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("this is", result.productElement(0))
-    assertEquals(1, result.productElement(1))
-    assertEquals(2.0, result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("a test", result.productElement(0))
-    assertEquals(3, result.productElement(1))
-    assertEquals(4.0, result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("#next", result.productElement(0))
-    assertEquals(5, result.productElement(1))
-    assertEquals(6.0, result.productElement(2))
-    result = format.nextRecord(result)
-    assertNull(result)
-  }
-
-  @Test
-  def ignoreSingleCharPrefixComments() {
-    val fileContent =
-      "#description of the data\n" +
-        "#successive commented line\n" +
-        "this is|1|2.0|\n" +
-        "a test|3|4.0|\n" +
-        "#next|5|6.0|\n"
-
-    val split = createTempFile(fileContent)
-
-    val typeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO))
-
-    val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
-    format.setCommentPrefix("#")
-    format.configure(new Configuration)
-    format.open(split)
-
-    var result = new Row(3)
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("this is", result.productElement(0))
-    assertEquals(1, result.productElement(1))
-    assertEquals(2.0, result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("a test", result.productElement(0))
-    assertEquals(3, result.productElement(1))
-    assertEquals(4.0, result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNull(result)
-  }
-
-  @Test
-  def ignoreMultiCharPrefixComments() {
-    val fileContent =
-      "//description of the data\n" +
-        "//successive commented line\n" +
-        "this is|1|2.0|\n" +
-        "a test|3|4.0|\n" +
-        "//next|5|6.0|\n"
-
-    val split = createTempFile(fileContent)
-
-    val typeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO))
-
-    val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
-    format.setCommentPrefix("//")
-    format.configure(new Configuration)
-    format.open(split)
-
-    var result = new Row(3)
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("this is", result.productElement(0))
-    assertEquals(1, result.productElement(1))
-    assertEquals(2.0, result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("a test", result.productElement(0))
-    assertEquals(3, result.productElement(1))
-    assertEquals(4.0, result.productElement(2))
-    result = format.nextRecord(result)
-    assertNull(result)
-  }
-
-  @Test
-  def readStringFields() {
-    val fileContent = "abc|def|ghijk\nabc||hhg\n|||"
-    
-    val split = createTempFile(fileContent)
-    
-    val typeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO))
-    
-    val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
-    format.configure(new Configuration)
-    format.open(split)
-    
-    var result = new Row(3)
-    
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("abc", result.productElement(0))
-    assertEquals("def", result.productElement(1))
-    assertEquals("ghijk", result.productElement(2))
-    
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("abc", result.productElement(0))
-    assertEquals("", result.productElement(1))
-    assertEquals("hhg", result.productElement(2))
-    
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("", result.productElement(0))
-    assertEquals("", result.productElement(1))
-    assertEquals("", result.productElement(2))
-    
-    result = format.nextRecord(result)
-    assertNull(result)
-    assertTrue(format.reachedEnd)
-  }
-
-  @Test def readMixedQuotedStringFields() {
-      val fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||"
-      
-      val split = createTempFile(fileContent)
-      
-      val typeInfo = new RowTypeInfo(Seq(
-        BasicTypeInfo.STRING_TYPE_INFO,
-        BasicTypeInfo.STRING_TYPE_INFO,
-        BasicTypeInfo.STRING_TYPE_INFO))
-      
-      val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
-      format.configure(new Configuration)
-      format.enableQuotedStringParsing('@')
-      format.open(split)
-    
-      var result = new Row(3)
-    
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("a|b|c", result.productElement(0))
-      assertEquals("def", result.productElement(1))
-      assertEquals("ghijk", result.productElement(2))
-    
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result.productElement(0))
-      assertEquals("", result.productElement(1))
-      assertEquals("|hhg", result.productElement(2))
-    
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("", result.productElement(0))
-      assertEquals("", result.productElement(1))
-      assertEquals("", result.productElement(2))
-    
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-  }
-
-  @Test def readStringFieldsWithTrailingDelimiters() {
-    val fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n"
-
-    val split = createTempFile(fileContent)
-
-    val typeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO))
-
-    val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
-    format.setFieldDelimiter("|-")
-    format.configure(new Configuration)
-    format.open(split)
-
-    var result = new Row(3)
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("abc", result.productElement(0))
-    assertEquals("def", result.productElement(1))
-    assertEquals("ghijk", result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("abc", result.productElement(0))
-    assertEquals("", result.productElement(1))
-    assertEquals("hhg", result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals("", result.productElement(0))
-    assertEquals("", result.productElement(1))
-    assertEquals("", result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNull(result)
-    assertTrue(format.reachedEnd)
-  }
-
-  @Test
-  def testIntegerFields() {
-    val fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"
-    
-    val split = createTempFile(fileContent)
-    
-    val typeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO))
-    
-    val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
-
-    format.setFieldDelimiter("|")
-    format.configure(new Configuration)
-    format.open(split)
-
-    var result = new Row(5)
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals(111, result.productElement(0))
-    assertEquals(222, result.productElement(1))
-    assertEquals(333, result.productElement(2))
-    assertEquals(444, result.productElement(3))
-    assertEquals(555, result.productElement(4))
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals(666, result.productElement(0))
-    assertEquals(777, result.productElement(1))
-    assertEquals(888, result.productElement(2))
-    assertEquals(999, result.productElement(3))
-    assertEquals(0, result.productElement(4))
-
-    result = format.nextRecord(result)
-    assertNull(result)
-    assertTrue(format.reachedEnd)
-  }
-
-  @Test
-  def testEmptyFields() {
-    val fileContent =
-      ",,,,,,,,\n" +
-      ",,,,,,,,\n" +
-      ",,,,,,,,\n" +
-      ",,,,,,,,\n" +
-      ",,,,,,,,\n" +
-      ",,,,,,,,\n" +
-      ",,,,,,,,\n" +
-      ",,,,,,,,\n"
-
-    val split = createTempFile(fileContent)
-
-    val typeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.BOOLEAN_TYPE_INFO,
-      BasicTypeInfo.BYTE_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO,
-      BasicTypeInfo.FLOAT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.LONG_TYPE_INFO,
-      BasicTypeInfo.SHORT_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO))
-
-    val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo, emptyColumnAsNull = true)
-    format.setFieldDelimiter(",")
-    format.configure(new Configuration)
-    format.open(split)
-
-    var result = new Row(8)
-    val linesCnt = fileContent.split("\n").length
-
-    for (i <- 0 until linesCnt) yield {
-      result = format.nextRecord(result)
-      assertNull(result.productElement(i))
-    }
-
-    // ensure no more rows
-    assertNull(format.nextRecord(result))
-    assertTrue(format.reachedEnd)
-  }
-
-  @Test
-  def testDoubleFields() {
-    val fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n"
-
-    val split = createTempFile(fileContent)
-
-    val typeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.DOUBLE_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO))
-
-    val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo)
-    format.setFieldDelimiter("|")
-    format.configure(new Configuration)
-    format.open(split)
-
-    var result = new Row(5)
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals(11.1, result.productElement(0))
-    assertEquals(22.2, result.productElement(1))
-    assertEquals(33.3, result.productElement(2))
-    assertEquals(44.4, result.productElement(3))
-    assertEquals(55.5, result.productElement(4))
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals(66.6, result.productElement(0))
-    assertEquals(77.7, result.productElement(1))
-    assertEquals(88.8, result.productElement(2))
-    assertEquals(99.9, result.productElement(3))
-    assertEquals(0.0, result.productElement(4))
-
-    result = format.nextRecord(result)
-    assertNull(result)
-    assertTrue(format.reachedEnd)
-  }
-
-  @Test
-  def testReadFirstN() {
-    val fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n"
-
-    val split = createTempFile(fileContent)
-
-    val typeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO))
-
-    val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo)
-    format.setFieldDelimiter("|")
-    format.configure(new Configuration)
-    format.open(split)
-
-    var result = new Row(2)
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals(111, result.productElement(0))
-    assertEquals(222, result.productElement(1))
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals(666, result.productElement(0))
-    assertEquals(777, result.productElement(1))
-
-    result = format.nextRecord(result)
-    assertNull(result)
-    assertTrue(format.reachedEnd)
-  }
-
-  @Test
-  def testReadSparseWithNullFieldsForTypes() {
-    val fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" +
-      "000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|"
-
-    val split = createTempFile(fileContent)
-
-    val typeInfo: RowTypeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO))
-
-    val format = new RowCsvInputFormat(
-      PATH,
-      rowTypeInfo = typeInfo,
-      includedFieldsMask = Array(true, false, false, true, false, false, false, true))
-    format.setFieldDelimiter("|x|")
-    format.configure(new Configuration)
-    format.open(split)
-
-    var result = new Row(3)
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals(111, result.productElement(0))
-    assertEquals(444, result.productElement(1))
-    assertEquals(888, result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals(0, result.productElement(0))
-    assertEquals(777, result.productElement(1))
-    assertEquals(333, result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNull(result)
-    assertTrue(format.reachedEnd)
-  }
-
-  @Test
-  def testReadSparseWithPositionSetter() {
-      val fileContent = "111|222|333|444|555|666|777|888|999|000|\n" +
-        "000|999|888|777|666|555|444|333|222|111|"
-
-      val split = createTempFile(fileContent)
-
-      val typeInfo = new RowTypeInfo(Seq(
-        BasicTypeInfo.INT_TYPE_INFO,
-        BasicTypeInfo.INT_TYPE_INFO,
-        BasicTypeInfo.INT_TYPE_INFO))
-
-      val format = new RowCsvInputFormat(
-        PATH,
-        typeInfo,
-        Array(0, 3, 7))
-      format.setFieldDelimiter("|")
-      format.configure(new Configuration)
-      format.open(split)
-
-      var result = new Row(3)
-      result = format.nextRecord(result)
-
-      assertNotNull(result)
-      assertEquals(111, result.productElement(0))
-      assertEquals(444, result.productElement(1))
-      assertEquals(888, result.productElement(2))
-
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(0, result.productElement(0))
-      assertEquals(777, result.productElement(1))
-      assertEquals(333, result.productElement(2))
-
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-  }
-
-  @Test
-  def testReadSparseWithMask() {
-    val fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" +
-      "000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&"
-
-    val split = RowCsvInputFormatTest.createTempFile(fileContent)
-
-    val typeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO))
-
-    val format = new RowCsvInputFormat(
-      PATH,
-      rowTypeInfo = typeInfo,
-      includedFieldsMask = Array(true, false, false, true, false, false, false, true))
-    format.setFieldDelimiter("&&")
-    format.configure(new Configuration)
-    format.open(split)
-
-    var result = new Row(3)
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals(111, result.productElement(0))
-    assertEquals(444, result.productElement(1))
-    assertEquals(888, result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals(0, result.productElement(0))
-    assertEquals(777, result.productElement(1))
-    assertEquals(333, result.productElement(2))
-
-    result = format.nextRecord(result)
-    assertNull(result)
-    assertTrue(format.reachedEnd)
-  }
-
-  @Test
-  def testParseStringErrors() {
-    val stringParser = new StringParser
-    stringParser.enableQuotedStringParsing('"'.toByte)
-
-    val failures = Seq(
-      ("\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING),
-      ("\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING)
-    )
-
-    for (failure <- failures) {
-      val result = stringParser.parseField(
-        failure._1.getBytes,
-        0,
-        failure._1.length,
-        Array[Byte]('|'),
-        null)
-
-      assertEquals(-1, result)
-      assertEquals(failure._2, stringParser.getErrorState)
-    }
-  }
-
-  // Test disabled because we do not support double-quote escaped quotes right now.
-  @Test
-  @Ignore
-  def testParserCorrectness() {
-    // RFC 4180 Compliance Test content
-    // Taken from http://en.wikipedia.org/wiki/Comma-separated_values#Example
-    val fileContent = "Year,Make,Model,Description,Price\n" +
-      "1997,Ford,E350,\"ac, abs, moon\",3000.00\n" +
-      "1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" +
-      "1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, loaded\",4799.00\n" +
-      "1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" +
-      ",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00"
-
-    val split = createTempFile(fileContent)
-
-    val typeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO))
-
-    val format = new RowCsvInputFormat(PATH, typeInfo)
-    format.setSkipFirstLineAsHeader(true)
-    format.setFieldDelimiter(",")
-    format.configure(new Configuration)
-    format.open(split)
-
-    var result = new Row(5)
-    val r1: Row = new Row(5)
-    r1.setField(0, 1997)
-    r1.setField(1, "Ford")
-    r1.setField(2, "E350")
-    r1.setField(3, "ac, abs, moon")
-    r1.setField(4, 3000.0)
-
-    val r2: Row = new Row(5)
-    r2.setField(0, 1999)
-    r2.setField(1, "Chevy")
-    r2.setField(2, "Venture \"Extended Edition\"")
-    r2.setField(3, "")
-    r2.setField(4, 4900.0)
-
-    val r3: Row = new Row(5)
-    r3.setField(0, 1996)
-    r3.setField(1, "Jeep")
-    r3.setField(2, "Grand Cherokee")
-    r3.setField(3, "MUST SELL! air, moon roof, loaded")
-    r3.setField(4, 4799.0)
-
-    val r4: Row = new Row(5)
-    r4.setField(0, 1999)
-    r4.setField(1, "Chevy")
-    r4.setField(2, "Venture \"Extended Edition, Very Large\"")
-    r4.setField(3, "")
-    r4.setField(4, 5000.0)
-
-    val r5: Row = new Row(5)
-    r5.setField(0, 0)
-    r5.setField(1, "")
-    r5.setField(2, "Venture \"Extended Edition\"")
-    r5.setField(3, "")
-    r5.setField(4, 4900.0)
-
-    val expectedLines = Array(r1, r2, r3, r4, r5)
-    for (expected <- expectedLines) {
-      result = format.nextRecord(result)
-      assertEquals(expected, result)
-    }
-    assertNull(format.nextRecord(result))
-    assertTrue(format.reachedEnd)
-  }
-
-  @Test
-  def testWindowsLineEndRemoval() {
-
-    // check typical use case -- linux file is correct and it is set up to linux(\n)
-    testRemovingTrailingCR("\n", "\n")
-
-    // check typical windows case -- windows file endings and file has windows file endings set up
-    testRemovingTrailingCR("\r\n", "\r\n")
-
-    // check problematic case windows file -- windows file endings(\r\n)
-    // but linux line endings (\n) set up
-    testRemovingTrailingCR("\r\n", "\n")
-
-    // check problematic case linux file -- linux file endings (\n)
-    // but windows file endings set up (\r\n)
-    // specific setup for windows line endings will expect \r\n because
-    // it has to be set up and is not standard.
-  }
-
-  @Test
-  def testQuotedStringParsingWithIncludeFields() {
-    val fileContent = "\"20:41:52-1-3-2015\"|\"Re: Taskmanager memory error in Eclipse\"|" +
-      "\"Blahblah <bl...@blahblah.org>\"|\"blaaa|\"blubb\""
-    val tempFile = File.createTempFile("CsvReaderQuotedString", "tmp")
-    tempFile.deleteOnExit()
-    tempFile.setWritable(true)
-
-    val writer = new OutputStreamWriter(new FileOutputStream(tempFile))
-    writer.write(fileContent)
-    writer.close()
-
-    val typeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO))
-
-    val inputFormat = new RowCsvInputFormat(
-      new Path(tempFile.toURI.toString),
-      rowTypeInfo = typeInfo,
-      includedFieldsMask = Array(true, false, true))
-    inputFormat.enableQuotedStringParsing('"')
-    inputFormat.setFieldDelimiter("|")
-    inputFormat.setDelimiter('\n')
-    inputFormat.configure(new Configuration)
-
-    val splits = inputFormat.createInputSplits(1)
-    inputFormat.open(splits(0))
-
-    val record = inputFormat.nextRecord(new Row(2))
-    assertEquals("20:41:52-1-3-2015", record.productElement(0))
-    assertEquals("Blahblah <bl...@blahblah.org>", record.productElement(1))
-  }
-
-  @Test
-  def testQuotedStringParsingWithEscapedQuotes() {
-    val fileContent = "\"\\\"Hello\\\" World\"|\"We are\\\" young\""
-    val tempFile = File.createTempFile("CsvReaderQuotedString", "tmp")
-    tempFile.deleteOnExit()
-    tempFile.setWritable(true)
-
-    val writer = new OutputStreamWriter(new FileOutputStream(tempFile))
-    writer.write(fileContent)
-    writer.close()
-
-    val typeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO))
-
-    val inputFormat = new RowCsvInputFormat(
-      new Path(tempFile.toURI.toString),
-      rowTypeInfo = typeInfo)
-    inputFormat.enableQuotedStringParsing('"')
-    inputFormat.setFieldDelimiter("|")
-    inputFormat.setDelimiter('\n')
-    inputFormat.configure(new Configuration)
-
-    val splits = inputFormat.createInputSplits(1)
-    inputFormat.open(splits(0))
-
-    val record = inputFormat.nextRecord(new Row(2))
-    assertEquals("\\\"Hello\\\" World", record.productElement(0))
-    assertEquals("We are\\\" young", record.productElement(1))
-  }
-
-  @Test
-  def testSqlTimeFields() {
-    val fileContent = "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5\n" +
-      "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5.3\n"
-
-    val split = createTempFile(fileContent)
-
-    val typeInfo = new RowTypeInfo(Seq(
-      SqlTimeTypeInfo.DATE,
-      SqlTimeTypeInfo.TIME,
-      SqlTimeTypeInfo.TIMESTAMP,
-      SqlTimeTypeInfo.TIMESTAMP))
-
-    val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo)
-    format.setFieldDelimiter("|")
-    format.configure(new Configuration)
-    format.open(split)
-
-    var result = new Row(4)
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals(Date.valueOf("1990-10-14"), result.productElement(0))
-    assertEquals(Time.valueOf("02:42:25"), result.productElement(1))
-    assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.productElement(2))
-    assertEquals(Timestamp.valueOf("1990-01-04 02:02:05"), result.productElement(3))
-
-    result = format.nextRecord(result)
-    assertNotNull(result)
-    assertEquals(Date.valueOf("1990-10-14"), result.productElement(0))
-    assertEquals(Time.valueOf("02:42:25"), result.productElement(1))
-    assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.productElement(2))
-    assertEquals(Timestamp.valueOf("1990-01-04 02:02:05.3"), result.productElement(3))
-
-    result = format.nextRecord(result)
-    assertNull(result)
-    assertTrue(format.reachedEnd)
-  }
-}
-
-object RowCsvInputFormatTest {
-
-  private val PATH = new Path("an/ignored/file/")
-
-  // static variables for testing the removal of \r\n to \n
-  private val FIRST_PART = "That is the first part"
-  private val SECOND_PART = "That is the second part"
-
-  private def createTempFile(content: String): FileInputSplit = {
-      val tempFile = File.createTempFile("test_contents", "tmp")
-      tempFile.deleteOnExit()
-      val wrt = new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8)
-      wrt.write(content)
-      wrt.close()
-      new FileInputSplit(
-        0,
-        new Path(tempFile.toURI.toString),
-        0,
-        tempFile.length,
-        Array("localhost"))
-  }
-
-  private def testRemovingTrailingCR(lineBreakerInFile: String, lineBreakerSetup: String) {
-    val fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + lineBreakerInFile
-
-    // create input file
-    val tempFile = File.createTempFile("CsvInputFormatTest", "tmp")
-    tempFile.deleteOnExit()
-    tempFile.setWritable(true)
-
-    val wrt = new OutputStreamWriter(new FileOutputStream(tempFile))
-    wrt.write(fileContent)
-    wrt.close()
-
-    val typeInfo = new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO))
-
-    val inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI.toString), typeInfo)
-    inputFormat.configure(new Configuration)
-    inputFormat.setDelimiter(lineBreakerSetup)
-
-    val splits = inputFormat.createInputSplits(1)
-    inputFormat.open(splits(0))
-
-    var result = inputFormat.nextRecord(new Row(1))
-    assertNotNull("Expecting to not return null", result)
-    assertEquals(FIRST_PART, result.productElement(0))
-
-    result = inputFormat.nextRecord(result)
-    assertNotNull("Expecting to not return null", result)
-    assertEquals(SECOND_PART, result.productElement(0))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala
deleted file mode 100644
index 557db3a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer}
-import org.apache.flink.api.java.tuple
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.typeutils.RowComparatorTest.MyPojo
-import org.junit.Assert._
-
-class RowComparatorTest extends ComparatorTestBase[Row] {
-
-  val typeInfo = new RowTypeInfo(
-    Array(
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO,
-      new TupleTypeInfo[tuple.Tuple2[Int, Boolean]](
-        BasicTypeInfo.INT_TYPE_INFO,
-        BasicTypeInfo.BOOLEAN_TYPE_INFO,
-        BasicTypeInfo.SHORT_TYPE_INFO),
-      TypeExtractor.createTypeInfo(classOf[MyPojo])))
-
-  val testPojo1 = new MyPojo()
-  // TODO we cannot test null here as PojoComparator has no support for null keys
-  testPojo1.name = ""
-  val testPojo2 = new MyPojo()
-  testPojo2.name = "Test1"
-  val testPojo3 = new MyPojo()
-  testPojo3.name = "Test2"
-
-  val data: Array[Row] = Array(
-    createRow(null, null, null, null, null),
-    createRow(0, null, null, null, null),
-    createRow(0, 0.0, null, null, null),
-    createRow(0, 0.0, "a", null, null),
-    createRow(1, 0.0, "a", null, null),
-    createRow(1, 1.0, "a", null, null),
-    createRow(1, 1.0, "b", null, null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo1),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo2),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo3)
-    )
-
-  override protected def deepEquals(message: String, should: Row, is: Row): Unit = {
-    val arity = should.productArity
-    assertEquals(message, arity, is.productArity)
-    var index = 0
-    while (index < arity) {
-      val copiedValue: Any = should.productElement(index)
-      val element: Any = is.productElement(index)
-      assertEquals(message, element, copiedValue)
-      index += 1
-    }
-  }
-
-  override protected def createComparator(ascending: Boolean): TypeComparator[Row] = {
-    typeInfo.createComparator(
-      Array(0, 1, 2, 3, 4, 5, 6),
-      Array(ascending, ascending, ascending, ascending, ascending, ascending, ascending),
-      0,
-      new ExecutionConfig())
-  }
-
-  override protected def createSerializer(): TypeSerializer[Row] = {
-    typeInfo.createSerializer(new ExecutionConfig())
-  }
-
-  override protected def getSortedTestData: Array[Row] = {
-    data
-  }
-
-  override protected def supportsNullKeys: Boolean = true
-
-  def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = {
-    val r: Row = new Row(5)
-    r.setField(0, f0)
-    r.setField(1, f1)
-    r.setField(2, f2)
-    r.setField(3, f3)
-    r.setField(4, f4)
-    r
-  }
-}
-
-object RowComparatorTest {
-
-  class MyPojo() extends Serializable with Comparable[MyPojo] {
-    // we cannot use null because the PojoComparator does not support null properly
-    var name: String = ""
-
-    override def compareTo(o: MyPojo): Int = {
-      if (name == null && o.name == null) {
-        0
-      }
-      else if (name == null) {
-        -1
-      }
-      else if (o.name == null) {
-        1
-      }
-      else {
-        name.compareTo(o.name)
-      }
-    }
-
-    override def equals(other: Any): Boolean = other match {
-      case that: MyPojo => compareTo(that) == 0
-      case _ => false
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala
deleted file mode 100644
index 33715c1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer}
-import org.apache.flink.api.table.Row
-import org.apache.flink.util.Preconditions
-import org.junit.Assert._
-
-/**
-  * Tests [[RowComparator]] for wide rows.
-  */
-class RowComparatorWithManyFieldsTest extends ComparatorTestBase[Row] {
-  val numberOfFields = 10
-  val fieldTypes = new Array[TypeInformation[_]](numberOfFields)
-  for (i <- 0 until numberOfFields) {
-    fieldTypes(i) = BasicTypeInfo.STRING_TYPE_INFO
-  }
-  val typeInfo = new RowTypeInfo(fieldTypes)
-
-  val data: Array[Row] = Array(
-    createRow(Array(null, "b0", "c0", "d0", "e0", "f0", "g0", "h0", "i0", "j0")),
-    createRow(Array("a1", "b1", "c1", "d1", "e1", "f1", "g1", "h1", "i1", "j1")),
-    createRow(Array("a2", "b2", "c2", "d2", "e2", "f2", "g2", "h2", "i2", "j2")),
-    createRow(Array("a3", "b3", "c3", "d3", "e3", "f3", "g3", "h3", "i3", "j3"))
-  )
-
-  override protected def deepEquals(message: String, should: Row, is: Row): Unit = {
-    val arity = should.productArity
-    assertEquals(message, arity, is.productArity)
-    var index = 0
-    while (index < arity) {
-      val copiedValue: Any = should.productElement(index)
-      val element: Any = is.productElement(index)
-      assertEquals(message, element, copiedValue)
-      index += 1
-    }
-  }
-
-  override protected def createComparator(ascending: Boolean): TypeComparator[Row] = {
-    typeInfo.createComparator(
-      Array(0),
-      Array(ascending),
-      0,
-      new ExecutionConfig())
-  }
-
-  override protected def createSerializer(): TypeSerializer[Row] = {
-    typeInfo.createSerializer(new ExecutionConfig())
-  }
-
-  override protected def getSortedTestData: Array[Row] = {
-    data
-  }
-
-  override protected def supportsNullKeys: Boolean = true
-
-  private def createRow(values: Array[_]): Row = {
-    Preconditions.checkArgument(values.length == numberOfFields)
-    val r: Row = new Row(numberOfFields)
-    values.zipWithIndex.foreach { case (e, i) => r.setField(i, e) }
-    r
-  }
-}