You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/15 10:49:56 UTC
[6/7] flink git commit: [FLINK-5189] [table] Delete Row and its
related classes from flink-table.
[FLINK-5189] [table] Delete Row and its related classes from flink-table.
This closes #3004.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9e6ec86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9e6ec86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9e6ec86
Branch: refs/heads/master
Commit: a9e6ec863a0879b0c8dea199535380a6d42a3121
Parents: 86f8a25
Author: tonycox <to...@gmail.com>
Authored: Wed Dec 14 12:41:51 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Dec 15 11:36:40 2016 +0100
----------------------------------------------------------------------
.../scala/org/apache/flink/api/table/Row.scala | 38 -
.../table/runtime/io/RowCsvInputFormat.scala | 181 ----
.../table/typeutils/NullAwareComparator.scala | 218 -----
.../api/table/typeutils/NullMaskUtils.scala | 98 ---
.../api/table/typeutils/RowComparator.scala | 425 ---------
.../api/table/typeutils/RowSerializer.scala | 209 -----
.../flink/api/table/typeutils/RowTypeInfo.scala | 108 ---
.../runtime/io/RowCsvInputFormatTest.scala | 882 -------------------
.../api/table/typeutils/RowComparatorTest.scala | 136 ---
.../RowComparatorWithManyFieldsTest.scala | 82 --
.../api/table/typeutils/RowSerializerTest.scala | 194 ----
11 files changed, 2571 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
deleted file mode 100644
index e3baab3..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * This is used for executing Table API operations. We use manually generated
- * TypeInfo to check the field types and create serializers and comparators.
- */
-class Row(arity: Int) extends Product {
-
- private val fields = new Array[Any](arity)
-
- def productArity = fields.length
-
- def productElement(i: Int): Any = fields(i)
-
- def setField(i: Int, value: Any): Unit = fields(i) = value
-
- def canEqual(that: Any) = false
-
- override def toString = fields.mkString(",")
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
deleted file mode 100644
index b0ab801..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.io
-
-import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.io.ParseException
-import org.apache.flink.api.java.io.CsvInputFormat
-import org.apache.flink.api.java.io.CsvInputFormat.{DEFAULT_FIELD_DELIMITER, DEFAULT_LINE_DELIMITER, createDefaultMask, toBooleanMask}
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.runtime.io.RowCsvInputFormat.extractTypeClasses
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.core.fs.Path
-import org.apache.flink.types.parser.FieldParser
-import org.apache.flink.types.parser.FieldParser.ParseErrorState
-
-@Internal
-@SerialVersionUID(1L)
-class RowCsvInputFormat(
- filePath: Path,
- rowTypeInfo: RowTypeInfo,
- lineDelimiter: String = DEFAULT_LINE_DELIMITER,
- fieldDelimiter: String = DEFAULT_FIELD_DELIMITER,
- includedFieldsMask: Array[Boolean] = null,
- emptyColumnAsNull: Boolean = false)
- extends CsvInputFormat[Row](filePath) {
-
- if (rowTypeInfo.getArity == 0) {
- throw new IllegalArgumentException("Row arity must be greater than 0.")
- }
- private val arity = rowTypeInfo.getArity
- private lazy val defaultFieldMask = createDefaultMask(arity)
- private val fieldsMask = Option(includedFieldsMask).getOrElse(defaultFieldMask)
-
- // prepare CsvInputFormat
- setDelimiter(lineDelimiter)
- setFieldDelimiter(fieldDelimiter)
- setFieldsGeneric(fieldsMask, extractTypeClasses(rowTypeInfo))
-
- def this(
- filePath: Path,
- rowTypeInfo: RowTypeInfo,
- lineDelimiter: String,
- fieldDelimiter: String,
- includedFieldsMask: Array[Int]) {
- this(
- filePath,
- rowTypeInfo,
- lineDelimiter,
- fieldDelimiter,
- if (includedFieldsMask == null) {
- null
- } else {
- toBooleanMask(includedFieldsMask)
- })
- }
-
- def this(
- filePath: Path,
- rowTypeInfo: RowTypeInfo,
- includedFieldsMask: Array[Int]) {
- this(
- filePath,
- rowTypeInfo,
- DEFAULT_LINE_DELIMITER,
- DEFAULT_FIELD_DELIMITER,
- includedFieldsMask)
- }
-
- def fillRecord(reuse: Row, parsedValues: Array[AnyRef]): Row = {
- val reuseRow = if (reuse == null) {
- new Row(arity)
- } else {
- reuse
- }
- var i: Int = 0
- while (i < parsedValues.length) {
- reuse.setField(i, parsedValues(i))
- i += 1
- }
- reuseRow
- }
-
- @throws[ParseException]
- override protected def parseRecord(
- holders: Array[AnyRef],
- bytes: Array[Byte],
- offset: Int,
- numBytes: Int)
- : Boolean = {
- val fieldDelimiter = this.getFieldDelimiter
- val fieldIncluded: Array[Boolean] = this.fieldIncluded
-
- var startPos = offset
- val limit = offset + numBytes
-
- var field = 0
- var output = 0
- while (field < fieldIncluded.length) {
-
- // check valid start position
- if (startPos >= limit) {
- if (isLenient) {
- return false
- } else {
- throw new ParseException("Row too short: " + new String(bytes, offset, numBytes))
- }
- }
-
- if (fieldIncluded(field)) {
- // parse field
- val parser: FieldParser[AnyRef] = this.getFieldParsers()(output)
- .asInstanceOf[FieldParser[AnyRef]]
- val latestValidPos = startPos
- startPos = parser.resetErrorStateAndParse(
- bytes,
- startPos,
- limit,
- fieldDelimiter,
- holders(output))
-
- if (!isLenient && (parser.getErrorState ne ParseErrorState.NONE)) {
- // the error state EMPTY_COLUMN is ignored
- if (parser.getErrorState ne ParseErrorState.EMPTY_COLUMN) {
- throw new ParseException(s"Parsing error for column $field of row '"
- + new String(bytes, offset, numBytes)
- + s"' originated by ${parser.getClass.getSimpleName}: ${parser.getErrorState}.")
- }
- }
- holders(output) = parser.getLastResult
-
- // check parse result:
- // the result is null if it is invalid
- // or empty with emptyColumnAsNull enabled
- if (startPos < 0 ||
- (emptyColumnAsNull && (parser.getErrorState eq ParseErrorState.EMPTY_COLUMN))) {
- holders(output) = null
- startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter)
- }
- output += 1
- } else {
- // skip field
- startPos = skipFields(bytes, startPos, limit, fieldDelimiter)
- }
-
- // check if something went wrong
- if (startPos < 0) {
- throw new ParseException(s"Unexpected parser position for column $field of row '"
- + new String(bytes, offset, numBytes) + "'")
- }
-
- field += 1
- }
- true
- }
-}
-
-object RowCsvInputFormat {
-
- private def extractTypeClasses(rowTypeInfo: RowTypeInfo): Array[Class[_]] = {
- val classes = for (i <- 0 until rowTypeInfo.getArity)
- yield rowTypeInfo.getTypeAt(i).getTypeClass
- classes.toArray
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullAwareComparator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullAwareComparator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullAwareComparator.scala
deleted file mode 100644
index 86a768d..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullAwareComparator.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, TypeComparator}
-import org.apache.flink.core.memory.{DataInputView, DataOutputView, MemorySegment}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Null-aware comparator that wraps a comparator which does not support null references.
- *
- * NOTE: This class assumes to be used within a composite type comparator (such
- * as [[RowComparator]]) that handles serialized comparison.
- */
-class NullAwareComparator[T](
- val wrappedComparator: TypeComparator[T],
- val order: Boolean)
- extends TypeComparator[T] {
-
- // number of flat fields
- private val flatFields = wrappedComparator.getFlatComparators.length
-
- // stores the null for reference comparison
- private var nullReference = false
-
- override def hash(record: T): Int = {
- if (record != null) {
- wrappedComparator.hash(record)
- }
- else {
- 0
- }
- }
-
- override def getNormalizeKeyLen: Int = {
- val len = wrappedComparator.getNormalizeKeyLen
- if (len == Integer.MAX_VALUE) {
- Integer.MAX_VALUE
- }
- else {
- len + 1 // add one for a null byte
- }
- }
-
- override def putNormalizedKey(
- record: T,
- target: MemorySegment,
- offset: Int,
- numBytes: Int)
- : Unit = {
- if (numBytes > 0) {
- // write a null byte with padding
- if (record == null) {
- target.putBoolean(offset, false)
- // write padding
- var j = 0
- while (j < numBytes - 1) {
- target.put(offset + 1 + j, 0.toByte)
- j += 1
- }
- }
- // write a non-null byte with key
- else {
- target.putBoolean(offset, true)
- // write key
- wrappedComparator.putNormalizedKey(record, target, offset + 1, numBytes - 1)
- }
- }
- }
-
- override def invertNormalizedKey(): Boolean = wrappedComparator.invertNormalizedKey()
-
- override def supportsSerializationWithKeyNormalization(): Boolean = false
-
- override def writeWithKeyNormalization(record: T, target: DataOutputView): Unit =
- throw new UnsupportedOperationException("Record serialization with leading normalized keys" +
- " not supported.")
-
- override def readWithKeyDenormalization(reuse: T, source: DataInputView): T =
- throw new UnsupportedOperationException("Record deserialization with leading normalized keys" +
- " not supported.")
-
- override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean =
- wrappedComparator.isNormalizedKeyPrefixOnly(keyBytes - 1)
-
- override def setReference(toCompare: T): Unit = {
- if (toCompare == null) {
- nullReference = true
- }
- else {
- nullReference = false
- wrappedComparator.setReference(toCompare)
- }
- }
-
- override def compare(first: T, second: T): Int = {
- // both values are null -> equality
- if (first == null && second == null) {
- 0
- }
- // first value is null -> inequality
- // but order is considered
- else if (first == null) {
- if (order) -1 else 1
- }
- // second value is null -> inequality
- // but order is considered
- else if (second == null) {
- if (order) 1 else -1
- }
- // no null values
- else {
- wrappedComparator.compare(first, second)
- }
- }
-
- override def compareToReference(referencedComparator: TypeComparator[T]): Int = {
- val otherComparator = referencedComparator.asInstanceOf[NullAwareComparator[T]]
- val otherNullReference = otherComparator.nullReference
- // both values are null -> equality
- if (nullReference && otherNullReference) {
- 0
- }
- // first value is null -> inequality
- // but order is considered
- else if (nullReference) {
- if (order) 1 else -1
- }
- // second value is null -> inequality
- // but order is considered
- else if (otherNullReference) {
- if (order) -1 else 1
- }
- // no null values
- else {
- wrappedComparator.compareToReference(otherComparator.wrappedComparator)
- }
- }
-
- override def supportsNormalizedKey(): Boolean = wrappedComparator.supportsNormalizedKey()
-
- override def equalToReference(candidate: T): Boolean = {
- // both values are null
- if (candidate == null && nullReference) {
- true
- }
- // one value is null
- else if (candidate == null || nullReference) {
- false
- }
- // no null value
- else {
- wrappedComparator.equalToReference(candidate)
- }
- }
-
- override def duplicate(): TypeComparator[T] = {
- new NullAwareComparator[T](wrappedComparator.duplicate(), order)
- }
-
- override def extractKeys(record: Any, target: Array[AnyRef], index: Int): Int = {
- if (record == null) {
- var i = 0
- while (i < flatFields) {
- target(index + i) = null
- i += 1
- }
- flatFields
- }
- else {
- wrappedComparator.extractKeys(record, target, index)
- }
- }
-
-
- override def getFlatComparators: Array[TypeComparator[_]] = {
- // determine the flat comparators and wrap them again in null-aware comparators
- val flatComparators = new ArrayBuffer[TypeComparator[_]]()
- wrappedComparator match {
- case ctc: CompositeTypeComparator[_] => ctc.getFlatComparator(flatComparators)
- case c: TypeComparator[_] => flatComparators += c
- }
- val wrappedComparators = flatComparators.map { c =>
- new NullAwareComparator[Any](c.asInstanceOf[TypeComparator[Any]], order)
- }
- wrappedComparators.toArray[TypeComparator[_]]
- }
-
- /**
- * This method is not implemented here. It must be implemented by the comparator this class
- * is contained in (e.g. RowComparator).
- *
- * @param firstSource The input view containing the first record.
- * @param secondSource The input view containing the second record.
- * @return An integer defining the oder among the objects in the same way as
- * { @link java.util.Comparator#compare(Object, Object)}.
- */
- override def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int =
- throw new UnsupportedOperationException("Comparator does not support null-aware serialized " +
- "comparision.")
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullMaskUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullMaskUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullMaskUtils.scala
deleted file mode 100644
index dcdc775..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/NullMaskUtils.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.table.Row
-import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-
-object NullMaskUtils {
-
- def writeNullMask(len: Int, value: Row, target: DataOutputView): Unit = {
- var b = 0x00
- var bytePos = 0
-
- var fieldPos = 0
- var numPos = 0
- while (fieldPos < len) {
- b = 0x00
- // set bits in byte
- bytePos = 0
- numPos = Math.min(8, len - fieldPos)
- while (bytePos < numPos) {
- b = b << 1
- // set bit if field is null
- if(value.productElement(fieldPos + bytePos) == null) {
- b |= 0x01
- }
- bytePos += 1
- }
- fieldPos += numPos
- // shift bits if last byte is not completely filled
- b <<= (8 - bytePos)
- // write byte
- target.writeByte(b)
- }
- }
-
- def readIntoNullMask(len: Int, source: DataInputView, nullMask: Array[Boolean]): Unit = {
- var b = 0x00
- var bytePos = 0
-
- var fieldPos = 0
- var numPos = 0
- while (fieldPos < len) {
- // read byte
- b = source.readUnsignedByte()
- bytePos = 0
- numPos = Math.min(8, len - fieldPos)
- while (bytePos < numPos) {
- nullMask(fieldPos + bytePos) = (b & 0x80) > 0
- b = b << 1
- bytePos += 1
- }
- fieldPos += numPos
- }
- }
-
- def readIntoAndCopyNullMask(
- len: Int,
- source: DataInputView,
- target: DataOutputView,
- nullMask: Array[Boolean]): Unit = {
- var b = 0x00
- var bytePos = 0
-
- var fieldPos = 0
- var numPos = 0
- while (fieldPos < len) {
- // read byte
- b = source.readUnsignedByte()
- // copy byte
- target.writeByte(b)
- bytePos = 0
- numPos = Math.min(8, len - fieldPos)
- while (bytePos < numPos) {
- nullMask(fieldPos + bytePos) = (b & 0x80) > 0
- b = b << 1
- bytePos += 1
- }
- fieldPos += numPos
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala
deleted file mode 100644
index 8bbe4d8..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowComparator.scala
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import java.util
-
-import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, TypeComparator, TypeSerializer}
-import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.typeutils.NullMaskUtils.readIntoNullMask
-import org.apache.flink.api.table.typeutils.RowComparator.{createAuxiliaryFields, makeNullAware}
-import org.apache.flink.core.memory.{DataInputView, DataOutputView, MemorySegment}
-import org.apache.flink.types.KeyFieldOutOfBoundsException
-
-/**
- * Comparator for [[Row]].
- */
-class RowComparator private (
- /** the number of fields of the Row */
- val numberOfFields: Int,
- /** key positions describe which fields are keys in what order */
- val keyPositions: Array[Int],
- /** null-aware comparators for the key fields, in the same order as the key fields */
- val comparators: Array[NullAwareComparator[Any]],
- /** serializers to deserialize the first n fields for comparison */
- val serializers: Array[TypeSerializer[Any]],
- /** auxiliary fields for normalized key support */
- private val auxiliaryFields: (Array[Int], Int, Int, Boolean))
- extends CompositeTypeComparator[Row] with Serializable {
-
- // null masks for serialized comparison
- private val nullMask1 = new Array[Boolean](numberOfFields)
- private val nullMask2 = new Array[Boolean](numberOfFields)
-
- // cache for the deserialized key field objects
- @transient
- private lazy val deserializedKeyFields1: Array[Any] = instantiateDeserializationFields()
-
- @transient
- private lazy val deserializedKeyFields2: Array[Any] = instantiateDeserializationFields()
-
- // create auxiliary fields
- private val normalizedKeyLengths: Array[Int] = auxiliaryFields._1
- private val numLeadingNormalizableKeys: Int = auxiliaryFields._2
- private val normalizableKeyPrefixLen: Int = auxiliaryFields._3
- private val invertNormKey: Boolean = auxiliaryFields._4
-
- /**
- * Intermediate constructor for creating auxiliary fields.
- */
- def this(
- numberOfFields: Int,
- keyPositions: Array[Int],
- comparators: Array[NullAwareComparator[Any]],
- serializers: Array[TypeSerializer[Any]]) = {
- this(
- numberOfFields,
- keyPositions,
- comparators,
- serializers,
- createAuxiliaryFields(keyPositions, comparators))
- }
-
- /**
- * General constructor for RowComparator.
- *
- * @param numberOfFields the number of fields of the Row
- * @param keyPositions key positions describe which fields are keys in what order
- * @param comparators non-null-aware comparators for the key fields, in the same order as
- * the key fields
- * @param serializers serializers to deserialize the first n fields for comparison
- * @param orders sorting orders for the fields
- */
- def this(
- numberOfFields: Int,
- keyPositions: Array[Int],
- comparators: Array[TypeComparator[Any]],
- serializers: Array[TypeSerializer[Any]],
- orders: Array[Boolean]) = {
- this(
- numberOfFields,
- keyPositions,
- makeNullAware(comparators, orders),
- serializers)
- }
-
- private def instantiateDeserializationFields(): Array[Any] = {
- val newFields = new Array[Any](serializers.length)
- var i = 0
- while (i < serializers.length) {
- newFields(i) = serializers(i).createInstance()
- i += 1
- }
- newFields
- }
-
- // --------------------------------------------------------------------------------------------
- // Comparator Methods
- // --------------------------------------------------------------------------------------------
-
- override def compareToReference(referencedComparator: TypeComparator[Row]): Int = {
- val other: RowComparator = referencedComparator.asInstanceOf[RowComparator]
- var i = 0
- try {
- while (i < keyPositions.length) {
- val comparator = comparators(i)
- val otherComparator = other.comparators(i)
-
- val cmp = comparator.compareToReference(otherComparator)
- if (cmp != 0) {
- return cmp
- }
- i = i + 1
- }
- 0
- }
- catch {
- case iobex: IndexOutOfBoundsException =>
- throw new KeyFieldOutOfBoundsException(keyPositions(i))
- }
- }
-
- override def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = {
- val len = serializers.length
- val keyLen = keyPositions.length
-
- readIntoNullMask(numberOfFields, firstSource, nullMask1)
- readIntoNullMask(numberOfFields, secondSource, nullMask2)
-
- // deserialize
- var i = 0
- while (i < len) {
- val serializer = serializers(i)
-
- // deserialize field 1
- if (!nullMask1(i)) {
- deserializedKeyFields1(i) = serializer.deserialize(deserializedKeyFields1(i), firstSource)
- }
-
- // deserialize field 2
- if (!nullMask2(i)) {
- deserializedKeyFields2(i) = serializer.deserialize(deserializedKeyFields2(i), secondSource)
- }
-
- i += 1
- }
-
- // compare
- i = 0
- while (i < keyLen) {
- val keyPos = keyPositions(i)
- val comparator = comparators(i)
-
- val isNull1 = nullMask1(keyPos)
- val isNull2 = nullMask2(keyPos)
-
- var cmp = 0
- // both values are null -> equality
- if (isNull1 && isNull2) {
- cmp = 0
- }
- // first value is null -> inequality
- else if (isNull1) {
- cmp = comparator.compare(null, deserializedKeyFields2(keyPos))
- }
- // second value is null -> inequality
- else if (isNull2) {
- cmp = comparator.compare(deserializedKeyFields1(keyPos), null)
- }
- // no null values
- else {
- cmp = comparator.compare(deserializedKeyFields1(keyPos), deserializedKeyFields2(keyPos))
- }
-
- if (cmp != 0) {
- return cmp
- }
-
- i += 1
- }
- 0
- }
-
- override def supportsNormalizedKey(): Boolean = numLeadingNormalizableKeys > 0
-
- override def getNormalizeKeyLen: Int = normalizableKeyPrefixLen
-
- override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean =
- numLeadingNormalizableKeys < keyPositions.length ||
- normalizableKeyPrefixLen == Integer.MAX_VALUE ||
- normalizableKeyPrefixLen > keyBytes
-
- override def invertNormalizedKey(): Boolean = invertNormKey
-
- override def supportsSerializationWithKeyNormalization(): Boolean = false
-
- override def writeWithKeyNormalization(record: Row, target: DataOutputView): Unit =
- throw new UnsupportedOperationException("Record serialization with leading normalized keys " +
- "not supported.")
-
- override def readWithKeyDenormalization(reuse: Row, source: DataInputView): Row =
- throw new UnsupportedOperationException("Record deserialization with leading normalized keys " +
- "not supported.")
-
- override def duplicate(): TypeComparator[Row] = {
- // copy comparator and serializer factories
- val comparatorsCopy = comparators.map(_.duplicate().asInstanceOf[NullAwareComparator[Any]])
- val serializersCopy = serializers.map(_.duplicate())
-
- new RowComparator(
- numberOfFields,
- keyPositions,
- comparatorsCopy,
- serializersCopy,
- auxiliaryFields)
- }
-
- override def hash(value: Row): Int = {
- var code: Int = 0
- var i = 0
- try {
- while(i < keyPositions.length) {
- code *= TupleComparatorBase.HASH_SALT(i & 0x1F)
- val element = value.productElement(keyPositions(i)) // element can be null
- code += comparators(i).hash(element)
- i += 1
- }
- } catch {
- case iobex: IndexOutOfBoundsException =>
- throw new KeyFieldOutOfBoundsException(keyPositions(i))
- }
- code
- }
-
- override def setReference(toCompare: Row) {
- var i = 0
- try {
- while(i < keyPositions.length) {
- val comparator = comparators(i)
- val element = toCompare.productElement(keyPositions(i))
- comparator.setReference(element) // element can be null
- i += 1
- }
- } catch {
- case iobex: IndexOutOfBoundsException =>
- throw new KeyFieldOutOfBoundsException(keyPositions(i))
- }
- }
-
- override def equalToReference(candidate: Row): Boolean = {
- var i = 0
- try {
- while(i < keyPositions.length) {
- val comparator = comparators(i)
- val element = candidate.productElement(keyPositions(i)) // element can be null
- // check if reference is not equal
- if (!comparator.equalToReference(element)) {
- return false
- }
- i += 1
- }
- } catch {
- case iobex: IndexOutOfBoundsException =>
- throw new KeyFieldOutOfBoundsException(keyPositions(i))
- }
- true
- }
-
- override def compare(first: Row, second: Row): Int = {
- var i = 0
- try {
- while(i < keyPositions.length) {
- val keyPos: Int = keyPositions(i)
- val comparator = comparators(i)
- val firstElement = first.productElement(keyPos) // element can be null
- val secondElement = second.productElement(keyPos) // element can be null
-
- val cmp = comparator.compare(firstElement, secondElement)
- if (cmp != 0) {
- return cmp
- }
- i += 1
- }
- } catch {
- case iobex: IndexOutOfBoundsException =>
- throw new KeyFieldOutOfBoundsException(keyPositions(i))
- }
- 0
- }
-
- override def putNormalizedKey(
- record: Row,
- target: MemorySegment,
- offset: Int,
- numBytes: Int)
- : Unit = {
- var bytesLeft = numBytes
- var currentOffset = offset
-
- var i = 0
- while (i < numLeadingNormalizableKeys && bytesLeft > 0) {
- var len = normalizedKeyLengths(i)
- len = if (bytesLeft >= len) len else bytesLeft
-
- val comparator = comparators(i)
- val element = record.productElement(keyPositions(i)) // element can be null
- // write key
- comparator.putNormalizedKey(element, target, currentOffset, len)
-
- bytesLeft -= len
- currentOffset += len
- i += 1
- }
- }
-
- override def getFlatComparator(flatComparators: util.List[TypeComparator[_]]): Unit =
- comparators.foreach { c =>
- c.getFlatComparators.foreach { fc =>
- flatComparators.add(fc)
- }
- }
-
- override def extractKeys(record: Any, target: Array[AnyRef], index: Int): Int = {
- val len = comparators.length
- var localIndex = index
- var i = 0
- while (i < len) {
- val element = record.asInstanceOf[Row].productElement(keyPositions(i)) // element can be null
- localIndex += comparators(i).extractKeys(element, target, localIndex)
- i += 1
- }
- localIndex - index
- }
-}
-
-object RowComparator {
- private def makeNullAware(
- comparators: Array[TypeComparator[Any]],
- orders: Array[Boolean])
- : Array[NullAwareComparator[Any]] =
- comparators
- .zip(orders)
- .map { case (comp, order) =>
- new NullAwareComparator[Any](
- comp,
- order)
- }
-
- /**
- * @return creates auxiliary fields for normalized key support
- */
- private def createAuxiliaryFields(
- keyPositions: Array[Int],
- comparators: Array[NullAwareComparator[Any]])
- : (Array[Int], Int, Int, Boolean) = {
-
- val normalizedKeyLengths = new Array[Int](keyPositions.length)
- var numLeadingNormalizableKeys = 0
- var normalizableKeyPrefixLen = 0
- var inverted = false
-
- var i = 0
- while (i < keyPositions.length) {
- val k = comparators(i)
- // as long as the leading keys support normalized keys, we can build up the composite key
- if (k.supportsNormalizedKey()) {
- if (i == 0) {
- // the first comparator decides whether we need to invert the key direction
- inverted = k.invertNormalizedKey()
- }
- else if (k.invertNormalizedKey() != inverted) {
- // if a successor does not agree on the inversion direction, it cannot be part of the
- // normalized key
- return (normalizedKeyLengths,
- numLeadingNormalizableKeys,
- normalizableKeyPrefixLen,
- inverted)
- }
- numLeadingNormalizableKeys += 1
- val len = k.getNormalizeKeyLen
- if (len < 0) {
- throw new RuntimeException("Comparator " + k.getClass.getName +
- " specifies an invalid length for the normalized key: " + len)
- }
- normalizedKeyLengths(i) = len
- normalizableKeyPrefixLen += len
- if (normalizableKeyPrefixLen < 0) {
- // overflow, which means we are out of budget for normalized key space anyways
- return (normalizedKeyLengths,
- numLeadingNormalizableKeys,
- Integer.MAX_VALUE,
- inverted)
- }
- }
- else {
- return (normalizedKeyLengths,
- numLeadingNormalizableKeys,
- normalizableKeyPrefixLen,
- inverted)
- }
- i += 1
- }
- (normalizedKeyLengths,
- numLeadingNormalizableKeys,
- normalizableKeyPrefixLen,
- inverted)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowSerializer.scala
deleted file mode 100644
index 825a99c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowSerializer.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.typeutils.NullMaskUtils.{writeNullMask, readIntoNullMask, readIntoAndCopyNullMask}
-import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-
-/**
- * Serializer for [[Row]].
- */
-class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
- extends TypeSerializer[Row] {
-
- private val nullMask = new Array[Boolean](fieldSerializers.length)
-
- override def isImmutableType: Boolean = false
-
- override def getLength: Int = -1
-
- override def duplicate: RowSerializer = {
- val duplicateFieldSerializers = fieldSerializers.map(_.duplicate())
- new RowSerializer(duplicateFieldSerializers)
- }
-
- override def createInstance: Row = {
- new Row(fieldSerializers.length)
- }
-
- override def copy(from: Row, reuse: Row): Row = {
- val len = fieldSerializers.length
-
- // cannot reuse, do a non-reuse copy
- if (reuse == null) {
- return copy(from)
- }
-
- if (from.productArity != len || reuse.productArity != len) {
- throw new RuntimeException("Row arity of reuse or from is incompatible with this " +
- "RowSerializer.")
- }
-
- var i = 0
- while (i < len) {
- val fromField = from.productElement(i)
- if (fromField != null) {
- val reuseField = reuse.productElement(i)
- if (reuseField != null) {
- val copy = fieldSerializers(i).copy(fromField, reuseField)
- reuse.setField(i, copy)
- }
- else {
- val copy = fieldSerializers(i).copy(fromField)
- reuse.setField(i, copy)
- }
- }
- else {
- reuse.setField(i, null)
- }
- i += 1
- }
- reuse
- }
-
- override def copy(from: Row): Row = {
- val len = fieldSerializers.length
-
- if (from.productArity != len) {
- throw new RuntimeException("Row arity of from does not match serializers.")
- }
- val result = new Row(len)
- var i = 0
- while (i < len) {
- val fromField = from.productElement(i).asInstanceOf[AnyRef]
- if (fromField != null) {
- val copy = fieldSerializers(i).copy(fromField)
- result.setField(i, copy)
- }
- else {
- result.setField(i, null)
- }
- i += 1
- }
- result
- }
-
- override def serialize(value: Row, target: DataOutputView) {
- val len = fieldSerializers.length
-
- if (value.productArity != len) {
- throw new RuntimeException("Row arity of value does not match serializers.")
- }
-
- // write a null mask
- writeNullMask(len, value, target)
-
- // serialize non-null fields
- var i = 0
- while (i < len) {
- val o = value.productElement(i).asInstanceOf[AnyRef]
- if (o != null) {
- val serializer = fieldSerializers(i)
- serializer.serialize(value.productElement(i), target)
- }
- i += 1
- }
- }
-
- override def deserialize(reuse: Row, source: DataInputView): Row = {
- val len = fieldSerializers.length
-
- if (reuse.productArity != len) {
- throw new RuntimeException("Row arity of reuse does not match serializers.")
- }
-
- // read null mask
- readIntoNullMask(len, source, nullMask)
-
- // read non-null fields
- var i = 0
- while (i < len) {
- if (nullMask(i)) {
- reuse.setField(i, null)
- }
- else {
- val reuseField = reuse.productElement(i).asInstanceOf[AnyRef]
- if (reuseField != null) {
- reuse.setField(i, fieldSerializers(i).deserialize(reuseField, source))
- }
- else {
- reuse.setField(i, fieldSerializers(i).deserialize(source))
- }
- }
- i += 1
- }
- reuse
- }
-
- override def deserialize(source: DataInputView): Row = {
- val len = fieldSerializers.length
-
- val result = new Row(len)
-
- // read null mask
- readIntoNullMask(len, source, nullMask)
-
- // read non-null fields
- var i = 0
- while (i < len) {
- if (nullMask(i)) {
- result.setField(i, null)
- }
- else {
- result.setField(i, fieldSerializers(i).deserialize(source))
- }
- i += 1
- }
- result
- }
-
- override def copy(source: DataInputView, target: DataOutputView): Unit = {
- val len = fieldSerializers.length
-
- // copy null mask
- readIntoAndCopyNullMask(len, source, target, nullMask)
-
- // read non-null fields
- var i = 0
- while (i < len) {
- if (!nullMask(i)) {
- fieldSerializers(i).copy(source, target)
- }
- i += 1
- }
- }
-
- override def equals(any: Any): Boolean = {
- any match {
- case otherRS: RowSerializer =>
- otherRS.canEqual(this) &&
- fieldSerializers.sameElements(otherRS.fieldSerializers)
- case _ => false
- }
- }
-
- override def canEqual(obj: AnyRef): Boolean = {
- obj.isInstanceOf[RowSerializer]
- }
-
- override def hashCode(): Int = {
- java.util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]])
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
deleted file mode 100644
index 711bb49..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowTypeInfo.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType.TypeComparatorBuilder
-import org.apache.flink.api.common.typeutils.TypeComparator
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-
-import scala.collection.mutable.ArrayBuffer
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.table.Row
-
-/**
- * TypeInformation for [[Row]].
- */
-class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]])
- extends CaseClassTypeInfo[Row](
- classOf[Row],
- Array(),
- fieldTypes,
- for (i <- fieldTypes.indices) yield "f" + i)
-{
-
- def this(fieldTypes: Array[TypeInformation[_]]) = {
- this(fieldTypes.toSeq)
- }
-
- /**
- * Temporary variable for directly passing orders to comparators.
- */
- var comparatorOrders: Option[Array[Boolean]] = None
-
- override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Row] = {
- val fieldSerializers: Array[TypeSerializer[Any]] = new Array[TypeSerializer[Any]](getArity)
- for (i <- 0 until getArity) {
- fieldSerializers(i) = this.types(i).createSerializer(executionConfig)
- .asInstanceOf[TypeSerializer[Any]]
- }
-
- new RowSerializer(fieldSerializers)
- }
-
- override def createComparator(
- logicalKeyFields: Array[Int],
- orders: Array[Boolean],
- logicalFieldOffset: Int,
- config: ExecutionConfig)
- : TypeComparator[Row] = {
- // store the order information for the builder
- comparatorOrders = Some(orders)
- val comparator = super.createComparator(logicalKeyFields, orders, logicalFieldOffset, config)
- comparatorOrders = None
- comparator
- }
-
- override def createTypeComparatorBuilder(): TypeComparatorBuilder[Row] = {
- new RowTypeComparatorBuilder(comparatorOrders.getOrElse(
- throw new IllegalStateException("Cannot create comparator builder without orders.")))
- }
-
- private class RowTypeComparatorBuilder(
- comparatorOrders: Array[Boolean])
- extends TypeComparatorBuilder[Row] {
-
- val fieldComparators: ArrayBuffer[TypeComparator[_]] = new ArrayBuffer[TypeComparator[_]]()
- val logicalKeyFields: ArrayBuffer[Int] = new ArrayBuffer[Int]()
-
- override def initializeTypeComparatorBuilder(size: Int): Unit = {
- fieldComparators.sizeHint(size)
- logicalKeyFields.sizeHint(size)
- }
-
- override def addComparatorField(fieldId: Int, comparator: TypeComparator[_]): Unit = {
- fieldComparators += comparator
- logicalKeyFields += fieldId
- }
-
- override def createTypeComparator(config: ExecutionConfig): TypeComparator[Row] = {
- val maxIndex = logicalKeyFields.max
-
- new RowComparator(
- getArity,
- logicalKeyFields.toArray,
- fieldComparators.toArray.asInstanceOf[Array[TypeComparator[Any]]],
- types.take(maxIndex + 1).map(_.createSerializer(config).asInstanceOf[TypeSerializer[Any]]),
- comparatorOrders
- )
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
deleted file mode 100644
index d72e7a8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
+++ /dev/null
@@ -1,882 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.io
-
-import java.io.{File, FileOutputStream, OutputStreamWriter}
-import java.nio.charset.StandardCharsets
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.common.io.ParseException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.runtime.io.RowCsvInputFormatTest.{PATH, createTempFile, testRemovingTrailingCR}
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.fs.{FileInputSplit, Path}
-import org.apache.flink.types.parser.{FieldParser, StringParser}
-import org.junit.Assert._
-import org.junit.{Ignore, Test}
-
-class RowCsvInputFormatTest {
-
- @Test
- def ignoreInvalidLines() {
- val fileContent =
- "#description of the data\n" +
- "header1|header2|header3|\n" +
- "this is|1|2.0|\n" +
- "//a comment\n" +
- "a test|3|4.0|\n" +
- "#next|5|6.0|\n"
-
- val split = createTempFile(fileContent)
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO))
-
- val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
- format.setLenient(false)
- val parameters = new Configuration
- format.configure(parameters)
- format.open(split)
-
- var result = new Row(3)
- try {
- result = format.nextRecord(result)
- fail("Parse Exception was not thrown! (Row too short)")
- }
- catch {
- case ex: ParseException => // ok
- }
-
- try {
- result = format.nextRecord(result)
- fail("Parse Exception was not thrown! (Invalid int value)")
- }
- catch {
- case ex: ParseException => // ok
- }
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("this is", result.productElement(0))
- assertEquals(1, result.productElement(1))
- assertEquals(2.0, result.productElement(2))
-
- try {
- result = format.nextRecord(result)
- fail("Parse Exception was not thrown! (Row too short)")
- }
- catch {
- case ex: ParseException => // ok
- }
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("a test", result.productElement(0))
- assertEquals(3, result.productElement(1))
- assertEquals(4.0, result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("#next", result.productElement(0))
- assertEquals(5, result.productElement(1))
- assertEquals(6.0, result.productElement(2))
-
- result = format.nextRecord(result)
- assertNull(result)
-
- // re-open with lenient = true
- format.setLenient(true)
- format.configure(parameters)
- format.open(split)
-
- result = new Row(3)
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("header1", result.productElement(0))
- assertNull(result.productElement(1))
- assertNull(result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("this is", result.productElement(0))
- assertEquals(1, result.productElement(1))
- assertEquals(2.0, result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("a test", result.productElement(0))
- assertEquals(3, result.productElement(1))
- assertEquals(4.0, result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("#next", result.productElement(0))
- assertEquals(5, result.productElement(1))
- assertEquals(6.0, result.productElement(2))
- result = format.nextRecord(result)
- assertNull(result)
- }
-
- @Test
- def ignoreSingleCharPrefixComments() {
- val fileContent =
- "#description of the data\n" +
- "#successive commented line\n" +
- "this is|1|2.0|\n" +
- "a test|3|4.0|\n" +
- "#next|5|6.0|\n"
-
- val split = createTempFile(fileContent)
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO))
-
- val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
- format.setCommentPrefix("#")
- format.configure(new Configuration)
- format.open(split)
-
- var result = new Row(3)
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("this is", result.productElement(0))
- assertEquals(1, result.productElement(1))
- assertEquals(2.0, result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("a test", result.productElement(0))
- assertEquals(3, result.productElement(1))
- assertEquals(4.0, result.productElement(2))
-
- result = format.nextRecord(result)
- assertNull(result)
- }
-
- @Test
- def ignoreMultiCharPrefixComments() {
- val fileContent =
- "//description of the data\n" +
- "//successive commented line\n" +
- "this is|1|2.0|\n" +
- "a test|3|4.0|\n" +
- "//next|5|6.0|\n"
-
- val split = createTempFile(fileContent)
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO))
-
- val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
- format.setCommentPrefix("//")
- format.configure(new Configuration)
- format.open(split)
-
- var result = new Row(3)
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("this is", result.productElement(0))
- assertEquals(1, result.productElement(1))
- assertEquals(2.0, result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("a test", result.productElement(0))
- assertEquals(3, result.productElement(1))
- assertEquals(4.0, result.productElement(2))
- result = format.nextRecord(result)
- assertNull(result)
- }
-
- @Test
- def readStringFields() {
- val fileContent = "abc|def|ghijk\nabc||hhg\n|||"
-
- val split = createTempFile(fileContent)
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO))
-
- val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
- format.configure(new Configuration)
- format.open(split)
-
- var result = new Row(3)
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("abc", result.productElement(0))
- assertEquals("def", result.productElement(1))
- assertEquals("ghijk", result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("abc", result.productElement(0))
- assertEquals("", result.productElement(1))
- assertEquals("hhg", result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("", result.productElement(0))
- assertEquals("", result.productElement(1))
- assertEquals("", result.productElement(2))
-
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
-
- @Test def readMixedQuotedStringFields() {
- val fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||"
-
- val split = createTempFile(fileContent)
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO))
-
- val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
- format.configure(new Configuration)
- format.enableQuotedStringParsing('@')
- format.open(split)
-
- var result = new Row(3)
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("a|b|c", result.productElement(0))
- assertEquals("def", result.productElement(1))
- assertEquals("ghijk", result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("abc", result.productElement(0))
- assertEquals("", result.productElement(1))
- assertEquals("|hhg", result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("", result.productElement(0))
- assertEquals("", result.productElement(1))
- assertEquals("", result.productElement(2))
-
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
-
- @Test def readStringFieldsWithTrailingDelimiters() {
- val fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n"
-
- val split = createTempFile(fileContent)
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO))
-
- val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
- format.setFieldDelimiter("|-")
- format.configure(new Configuration)
- format.open(split)
-
- var result = new Row(3)
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("abc", result.productElement(0))
- assertEquals("def", result.productElement(1))
- assertEquals("ghijk", result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("abc", result.productElement(0))
- assertEquals("", result.productElement(1))
- assertEquals("hhg", result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("", result.productElement(0))
- assertEquals("", result.productElement(1))
- assertEquals("", result.productElement(2))
-
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
-
- @Test
- def testIntegerFields() {
- val fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"
-
- val split = createTempFile(fileContent)
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO))
-
- val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
-
- format.setFieldDelimiter("|")
- format.configure(new Configuration)
- format.open(split)
-
- var result = new Row(5)
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(111, result.productElement(0))
- assertEquals(222, result.productElement(1))
- assertEquals(333, result.productElement(2))
- assertEquals(444, result.productElement(3))
- assertEquals(555, result.productElement(4))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(666, result.productElement(0))
- assertEquals(777, result.productElement(1))
- assertEquals(888, result.productElement(2))
- assertEquals(999, result.productElement(3))
- assertEquals(0, result.productElement(4))
-
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
-
- @Test
- def testEmptyFields() {
- val fileContent =
- ",,,,,,,,\n" +
- ",,,,,,,,\n" +
- ",,,,,,,,\n" +
- ",,,,,,,,\n" +
- ",,,,,,,,\n" +
- ",,,,,,,,\n" +
- ",,,,,,,,\n" +
- ",,,,,,,,\n"
-
- val split = createTempFile(fileContent)
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.BOOLEAN_TYPE_INFO,
- BasicTypeInfo.BYTE_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.FLOAT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.SHORT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO))
-
- val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo, emptyColumnAsNull = true)
- format.setFieldDelimiter(",")
- format.configure(new Configuration)
- format.open(split)
-
- var result = new Row(8)
- val linesCnt = fileContent.split("\n").length
-
- for (i <- 0 until linesCnt) yield {
- result = format.nextRecord(result)
- assertNull(result.productElement(i))
- }
-
- // ensure no more rows
- assertNull(format.nextRecord(result))
- assertTrue(format.reachedEnd)
- }
-
- @Test
- def testDoubleFields() {
- val fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n"
-
- val split = createTempFile(fileContent)
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO))
-
- val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo)
- format.setFieldDelimiter("|")
- format.configure(new Configuration)
- format.open(split)
-
- var result = new Row(5)
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(11.1, result.productElement(0))
- assertEquals(22.2, result.productElement(1))
- assertEquals(33.3, result.productElement(2))
- assertEquals(44.4, result.productElement(3))
- assertEquals(55.5, result.productElement(4))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(66.6, result.productElement(0))
- assertEquals(77.7, result.productElement(1))
- assertEquals(88.8, result.productElement(2))
- assertEquals(99.9, result.productElement(3))
- assertEquals(0.0, result.productElement(4))
-
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
-
- @Test
- def testReadFirstN() {
- val fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n"
-
- val split = createTempFile(fileContent)
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO))
-
- val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo)
- format.setFieldDelimiter("|")
- format.configure(new Configuration)
- format.open(split)
-
- var result = new Row(2)
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(111, result.productElement(0))
- assertEquals(222, result.productElement(1))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(666, result.productElement(0))
- assertEquals(777, result.productElement(1))
-
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
-
- @Test
- def testReadSparseWithNullFieldsForTypes() {
- val fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" +
- "000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|"
-
- val split = createTempFile(fileContent)
-
- val typeInfo: RowTypeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO))
-
- val format = new RowCsvInputFormat(
- PATH,
- rowTypeInfo = typeInfo,
- includedFieldsMask = Array(true, false, false, true, false, false, false, true))
- format.setFieldDelimiter("|x|")
- format.configure(new Configuration)
- format.open(split)
-
- var result = new Row(3)
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(111, result.productElement(0))
- assertEquals(444, result.productElement(1))
- assertEquals(888, result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(0, result.productElement(0))
- assertEquals(777, result.productElement(1))
- assertEquals(333, result.productElement(2))
-
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
-
- @Test
- def testReadSparseWithPositionSetter() {
- val fileContent = "111|222|333|444|555|666|777|888|999|000|\n" +
- "000|999|888|777|666|555|444|333|222|111|"
-
- val split = createTempFile(fileContent)
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO))
-
- val format = new RowCsvInputFormat(
- PATH,
- typeInfo,
- Array(0, 3, 7))
- format.setFieldDelimiter("|")
- format.configure(new Configuration)
- format.open(split)
-
- var result = new Row(3)
- result = format.nextRecord(result)
-
- assertNotNull(result)
- assertEquals(111, result.productElement(0))
- assertEquals(444, result.productElement(1))
- assertEquals(888, result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(0, result.productElement(0))
- assertEquals(777, result.productElement(1))
- assertEquals(333, result.productElement(2))
-
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
-
- @Test
- def testReadSparseWithMask() {
- val fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" +
- "000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&"
-
- val split = RowCsvInputFormatTest.createTempFile(fileContent)
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO))
-
- val format = new RowCsvInputFormat(
- PATH,
- rowTypeInfo = typeInfo,
- includedFieldsMask = Array(true, false, false, true, false, false, false, true))
- format.setFieldDelimiter("&&")
- format.configure(new Configuration)
- format.open(split)
-
- var result = new Row(3)
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(111, result.productElement(0))
- assertEquals(444, result.productElement(1))
- assertEquals(888, result.productElement(2))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(0, result.productElement(0))
- assertEquals(777, result.productElement(1))
- assertEquals(333, result.productElement(2))
-
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
-
- @Test
- def testParseStringErrors() {
- val stringParser = new StringParser
- stringParser.enableQuotedStringParsing('"'.toByte)
-
- val failures = Seq(
- ("\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING),
- ("\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING)
- )
-
- for (failure <- failures) {
- val result = stringParser.parseField(
- failure._1.getBytes,
- 0,
- failure._1.length,
- Array[Byte]('|'),
- null)
-
- assertEquals(-1, result)
- assertEquals(failure._2, stringParser.getErrorState)
- }
- }
-
- // Test disabled because we do not support double-quote escaped quotes right now.
- @Test
- @Ignore
- def testParserCorrectness() {
- // RFC 4180 Compliance Test content
- // Taken from http://en.wikipedia.org/wiki/Comma-separated_values#Example
- val fileContent = "Year,Make,Model,Description,Price\n" +
- "1997,Ford,E350,\"ac, abs, moon\",3000.00\n" +
- "1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" +
- "1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, loaded\",4799.00\n" +
- "1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" +
- ",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00"
-
- val split = createTempFile(fileContent)
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO))
-
- val format = new RowCsvInputFormat(PATH, typeInfo)
- format.setSkipFirstLineAsHeader(true)
- format.setFieldDelimiter(",")
- format.configure(new Configuration)
- format.open(split)
-
- var result = new Row(5)
- val r1: Row = new Row(5)
- r1.setField(0, 1997)
- r1.setField(1, "Ford")
- r1.setField(2, "E350")
- r1.setField(3, "ac, abs, moon")
- r1.setField(4, 3000.0)
-
- val r2: Row = new Row(5)
- r2.setField(0, 1999)
- r2.setField(1, "Chevy")
- r2.setField(2, "Venture \"Extended Edition\"")
- r2.setField(3, "")
- r2.setField(4, 4900.0)
-
- val r3: Row = new Row(5)
- r3.setField(0, 1996)
- r3.setField(1, "Jeep")
- r3.setField(2, "Grand Cherokee")
- r3.setField(3, "MUST SELL! air, moon roof, loaded")
- r3.setField(4, 4799.0)
-
- val r4: Row = new Row(5)
- r4.setField(0, 1999)
- r4.setField(1, "Chevy")
- r4.setField(2, "Venture \"Extended Edition, Very Large\"")
- r4.setField(3, "")
- r4.setField(4, 5000.0)
-
- val r5: Row = new Row(5)
- r5.setField(0, 0)
- r5.setField(1, "")
- r5.setField(2, "Venture \"Extended Edition\"")
- r5.setField(3, "")
- r5.setField(4, 4900.0)
-
- val expectedLines = Array(r1, r2, r3, r4, r5)
- for (expected <- expectedLines) {
- result = format.nextRecord(result)
- assertEquals(expected, result)
- }
- assertNull(format.nextRecord(result))
- assertTrue(format.reachedEnd)
- }
-
- @Test
- def testWindowsLineEndRemoval() {
-
- // check typical use case -- linux file is correct and it is set up to linux(\n)
- testRemovingTrailingCR("\n", "\n")
-
- // check typical windows case -- windows file endings and file has windows file endings set up
- testRemovingTrailingCR("\r\n", "\r\n")
-
- // check problematic case windows file -- windows file endings(\r\n)
- // but linux line endings (\n) set up
- testRemovingTrailingCR("\r\n", "\n")
-
- // check problematic case linux file -- linux file endings (\n)
- // but windows file endings set up (\r\n)
- // specific setup for windows line endings will expect \r\n because
- // it has to be set up and is not standard.
- }
-
- @Test
- def testQuotedStringParsingWithIncludeFields() {
- val fileContent = "\"20:41:52-1-3-2015\"|\"Re: Taskmanager memory error in Eclipse\"|" +
- "\"Blahblah <bl...@blahblah.org>\"|\"blaaa|\"blubb\""
- val tempFile = File.createTempFile("CsvReaderQuotedString", "tmp")
- tempFile.deleteOnExit()
- tempFile.setWritable(true)
-
- val writer = new OutputStreamWriter(new FileOutputStream(tempFile))
- writer.write(fileContent)
- writer.close()
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO))
-
- val inputFormat = new RowCsvInputFormat(
- new Path(tempFile.toURI.toString),
- rowTypeInfo = typeInfo,
- includedFieldsMask = Array(true, false, true))
- inputFormat.enableQuotedStringParsing('"')
- inputFormat.setFieldDelimiter("|")
- inputFormat.setDelimiter('\n')
- inputFormat.configure(new Configuration)
-
- val splits = inputFormat.createInputSplits(1)
- inputFormat.open(splits(0))
-
- val record = inputFormat.nextRecord(new Row(2))
- assertEquals("20:41:52-1-3-2015", record.productElement(0))
- assertEquals("Blahblah <bl...@blahblah.org>", record.productElement(1))
- }
-
- @Test
- def testQuotedStringParsingWithEscapedQuotes() {
- val fileContent = "\"\\\"Hello\\\" World\"|\"We are\\\" young\""
- val tempFile = File.createTempFile("CsvReaderQuotedString", "tmp")
- tempFile.deleteOnExit()
- tempFile.setWritable(true)
-
- val writer = new OutputStreamWriter(new FileOutputStream(tempFile))
- writer.write(fileContent)
- writer.close()
-
- val typeInfo = new RowTypeInfo(Seq(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO))
-
- val inputFormat = new RowCsvInputFormat(
- new Path(tempFile.toURI.toString),
- rowTypeInfo = typeInfo)
- inputFormat.enableQuotedStringParsing('"')
- inputFormat.setFieldDelimiter("|")
- inputFormat.setDelimiter('\n')
- inputFormat.configure(new Configuration)
-
- val splits = inputFormat.createInputSplits(1)
- inputFormat.open(splits(0))
-
- val record = inputFormat.nextRecord(new Row(2))
- assertEquals("\\\"Hello\\\" World", record.productElement(0))
- assertEquals("We are\\\" young", record.productElement(1))
- }
-
- @Test
- def testSqlTimeFields() {
- val fileContent = "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5\n" +
- "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5.3\n"
-
- val split = createTempFile(fileContent)
-
- val typeInfo = new RowTypeInfo(Seq(
- SqlTimeTypeInfo.DATE,
- SqlTimeTypeInfo.TIME,
- SqlTimeTypeInfo.TIMESTAMP,
- SqlTimeTypeInfo.TIMESTAMP))
-
- val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo)
- format.setFieldDelimiter("|")
- format.configure(new Configuration)
- format.open(split)
-
- var result = new Row(4)
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(Date.valueOf("1990-10-14"), result.productElement(0))
- assertEquals(Time.valueOf("02:42:25"), result.productElement(1))
- assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.productElement(2))
- assertEquals(Timestamp.valueOf("1990-01-04 02:02:05"), result.productElement(3))
-
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(Date.valueOf("1990-10-14"), result.productElement(0))
- assertEquals(Time.valueOf("02:42:25"), result.productElement(1))
- assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.productElement(2))
- assertEquals(Timestamp.valueOf("1990-01-04 02:02:05.3"), result.productElement(3))
-
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
-}
-
-object RowCsvInputFormatTest {
-
- private val PATH = new Path("an/ignored/file/")
-
- // static variables for testing the removal of \r\n to \n
- private val FIRST_PART = "That is the first part"
- private val SECOND_PART = "That is the second part"
-
- private def createTempFile(content: String): FileInputSplit = {
- val tempFile = File.createTempFile("test_contents", "tmp")
- tempFile.deleteOnExit()
- val wrt = new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8)
- wrt.write(content)
- wrt.close()
- new FileInputSplit(
- 0,
- new Path(tempFile.toURI.toString),
- 0,
- tempFile.length,
- Array("localhost"))
- }
-
- private def testRemovingTrailingCR(lineBreakerInFile: String, lineBreakerSetup: String) {
- val fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + lineBreakerInFile
-
- // create input file
- val tempFile = File.createTempFile("CsvInputFormatTest", "tmp")
- tempFile.deleteOnExit()
- tempFile.setWritable(true)
-
- val wrt = new OutputStreamWriter(new FileOutputStream(tempFile))
- wrt.write(fileContent)
- wrt.close()
-
- val typeInfo = new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO))
-
- val inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI.toString), typeInfo)
- inputFormat.configure(new Configuration)
- inputFormat.setDelimiter(lineBreakerSetup)
-
- val splits = inputFormat.createInputSplits(1)
- inputFormat.open(splits(0))
-
- var result = inputFormat.nextRecord(new Row(1))
- assertNotNull("Expecting to not return null", result)
- assertEquals(FIRST_PART, result.productElement(0))
-
- result = inputFormat.nextRecord(result)
- assertNotNull("Expecting to not return null", result)
- assertEquals(SECOND_PART, result.productElement(0))
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala
deleted file mode 100644
index 557db3a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer}
-import org.apache.flink.api.java.tuple
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.typeutils.RowComparatorTest.MyPojo
-import org.junit.Assert._
-
-class RowComparatorTest extends ComparatorTestBase[Row] {
-
- val typeInfo = new RowTypeInfo(
- Array(
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- new TupleTypeInfo[tuple.Tuple2[Int, Boolean]](
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.BOOLEAN_TYPE_INFO,
- BasicTypeInfo.SHORT_TYPE_INFO),
- TypeExtractor.createTypeInfo(classOf[MyPojo])))
-
- val testPojo1 = new MyPojo()
- // TODO we cannot test null here as PojoComparator has no support for null keys
- testPojo1.name = ""
- val testPojo2 = new MyPojo()
- testPojo2.name = "Test1"
- val testPojo3 = new MyPojo()
- testPojo3.name = "Test2"
-
- val data: Array[Row] = Array(
- createRow(null, null, null, null, null),
- createRow(0, null, null, null, null),
- createRow(0, 0.0, null, null, null),
- createRow(0, 0.0, "a", null, null),
- createRow(1, 0.0, "a", null, null),
- createRow(1, 1.0, "a", null, null),
- createRow(1, 1.0, "b", null, null),
- createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), null),
- createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), null),
- createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), null),
- createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), null),
- createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo1),
- createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo2),
- createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), testPojo3)
- )
-
- override protected def deepEquals(message: String, should: Row, is: Row): Unit = {
- val arity = should.productArity
- assertEquals(message, arity, is.productArity)
- var index = 0
- while (index < arity) {
- val copiedValue: Any = should.productElement(index)
- val element: Any = is.productElement(index)
- assertEquals(message, element, copiedValue)
- index += 1
- }
- }
-
- override protected def createComparator(ascending: Boolean): TypeComparator[Row] = {
- typeInfo.createComparator(
- Array(0, 1, 2, 3, 4, 5, 6),
- Array(ascending, ascending, ascending, ascending, ascending, ascending, ascending),
- 0,
- new ExecutionConfig())
- }
-
- override protected def createSerializer(): TypeSerializer[Row] = {
- typeInfo.createSerializer(new ExecutionConfig())
- }
-
- override protected def getSortedTestData: Array[Row] = {
- data
- }
-
- override protected def supportsNullKeys: Boolean = true
-
- def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = {
- val r: Row = new Row(5)
- r.setField(0, f0)
- r.setField(1, f1)
- r.setField(2, f2)
- r.setField(3, f3)
- r.setField(4, f4)
- r
- }
-}
-
-object RowComparatorTest {
-
- class MyPojo() extends Serializable with Comparable[MyPojo] {
- // we cannot use null because the PojoComparator does not support null properly
- var name: String = ""
-
- override def compareTo(o: MyPojo): Int = {
- if (name == null && o.name == null) {
- 0
- }
- else if (name == null) {
- -1
- }
- else if (o.name == null) {
- 1
- }
- else {
- name.compareTo(o.name)
- }
- }
-
- override def equals(other: Any): Boolean = other match {
- case that: MyPojo => compareTo(that) == 0
- case _ => false
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/a9e6ec86/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala
deleted file mode 100644
index 33715c1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorWithManyFieldsTest.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer}
-import org.apache.flink.api.table.Row
-import org.apache.flink.util.Preconditions
-import org.junit.Assert._
-
-/**
- * Tests [[RowComparator]] for wide rows.
- */
-class RowComparatorWithManyFieldsTest extends ComparatorTestBase[Row] {
- val numberOfFields = 10
- val fieldTypes = new Array[TypeInformation[_]](numberOfFields)
- for (i <- 0 until numberOfFields) {
- fieldTypes(i) = BasicTypeInfo.STRING_TYPE_INFO
- }
- val typeInfo = new RowTypeInfo(fieldTypes)
-
- val data: Array[Row] = Array(
- createRow(Array(null, "b0", "c0", "d0", "e0", "f0", "g0", "h0", "i0", "j0")),
- createRow(Array("a1", "b1", "c1", "d1", "e1", "f1", "g1", "h1", "i1", "j1")),
- createRow(Array("a2", "b2", "c2", "d2", "e2", "f2", "g2", "h2", "i2", "j2")),
- createRow(Array("a3", "b3", "c3", "d3", "e3", "f3", "g3", "h3", "i3", "j3"))
- )
-
- override protected def deepEquals(message: String, should: Row, is: Row): Unit = {
- val arity = should.productArity
- assertEquals(message, arity, is.productArity)
- var index = 0
- while (index < arity) {
- val copiedValue: Any = should.productElement(index)
- val element: Any = is.productElement(index)
- assertEquals(message, element, copiedValue)
- index += 1
- }
- }
-
- override protected def createComparator(ascending: Boolean): TypeComparator[Row] = {
- typeInfo.createComparator(
- Array(0),
- Array(ascending),
- 0,
- new ExecutionConfig())
- }
-
- override protected def createSerializer(): TypeSerializer[Row] = {
- typeInfo.createSerializer(new ExecutionConfig())
- }
-
- override protected def getSortedTestData: Array[Row] = {
- data
- }
-
- override protected def supportsNullKeys: Boolean = true
-
- private def createRow(values: Array[_]): Row = {
- Preconditions.checkArgument(values.length == numberOfFields)
- val r: Row = new Row(numberOfFields)
- values.zipWithIndex.foreach { case (e, i) => r.setField(i, e) }
- r
- }
-}