You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:59 UTC
[30/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
deleted file mode 100644
index c1c79ec..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.math.BigDecimal
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.types.Row
-
-abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
-
- protected var minIndex: Int = _
-
- /**
- * Initiate the intermediate aggregate value in Row.
- *
- * @param intermediate The intermediate aggregate row to initiate.
- */
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(minIndex, null)
- }
-
- /**
- * Accessed in MapFunction, prepare the input of partial aggregate.
- *
- * @param value
- * @param partial
- */
- override def prepare(value: Any, partial: Row): Unit = {
- if (value == null) {
- initiate(partial)
- } else {
- partial.setField(minIndex, value)
- }
- }
-
- /**
- * Accessed in CombineFunction and GroupReduceFunction, merge partial
- * aggregate result into aggregate buffer.
- *
- * @param partial
- * @param buffer
- */
- override def merge(partial: Row, buffer: Row): Unit = {
- val partialValue = partial.getField(minIndex).asInstanceOf[T]
- if (partialValue != null) {
- val bufferValue = buffer.getField(minIndex).asInstanceOf[T]
- if (bufferValue != null) {
- val min : T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue
- buffer.setField(minIndex, min)
- } else {
- buffer.setField(minIndex, partialValue)
- }
- }
- }
-
- /**
- * Return the final aggregated result based on aggregate buffer.
- *
- * @param buffer
- * @return
- */
- override def evaluate(buffer: Row): T = {
- buffer.getField(minIndex).asInstanceOf[T]
- }
-
- override def supportPartial: Boolean = true
-
- override def setAggOffsetInRow(aggOffset: Int): Unit = {
- minIndex = aggOffset
- }
-}
-
-class ByteMinAggregate extends MinAggregate[Byte] {
-
- override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
-
-}
-
-class ShortMinAggregate extends MinAggregate[Short] {
-
- override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
-
-}
-
-class IntMinAggregate extends MinAggregate[Int] {
-
- override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
-
-}
-
-class LongMinAggregate extends MinAggregate[Long] {
-
- override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
-
-}
-
-class FloatMinAggregate extends MinAggregate[Float] {
-
- override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
-
-}
-
-class DoubleMinAggregate extends MinAggregate[Double] {
-
- override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
-
-}
-
-class BooleanMinAggregate extends MinAggregate[Boolean] {
-
- override def intermediateDataType = Array(BasicTypeInfo.BOOLEAN_TYPE_INFO)
-
-}
-
-class DecimalMinAggregate extends Aggregate[BigDecimal] {
-
- protected var minIndex: Int = _
-
- override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO)
-
- override def initiate(intermediate: Row): Unit = {
- intermediate.setField(minIndex, null)
- }
-
- override def prepare(value: Any, partial: Row): Unit = {
- if (value == null) {
- initiate(partial)
- } else {
- partial.setField(minIndex, value)
- }
- }
-
- override def merge(partial: Row, buffer: Row): Unit = {
- val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
- if (partialValue != null) {
- val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
- if (bufferValue != null) {
- val min = if (partialValue.compareTo(bufferValue) < 0) partialValue else bufferValue
- buffer.setField(minIndex, min)
- } else {
- buffer.setField(minIndex, partialValue)
- }
- }
- }
-
- override def evaluate(buffer: Row): BigDecimal = {
- buffer.getField(minIndex).asInstanceOf[BigDecimal]
- }
-
- override def supportPartial: Boolean = true
-
- override def setAggOffsetInRow(aggOffset: Int): Unit = {
- minIndex = aggOffset
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
deleted file mode 100644
index 16f1608..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/SumAggregate.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.aggregate
-
-import java.math.BigDecimal
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.types.Row
-
-abstract class SumAggregate[T: Numeric]
- extends Aggregate[T] {
-
- private val numeric = implicitly[Numeric[T]]
- protected var sumIndex: Int = _
-
- override def initiate(partial: Row): Unit = {
- partial.setField(sumIndex, null)
- }
-
- override def merge(partial1: Row, buffer: Row): Unit = {
- val partialValue = partial1.getField(sumIndex).asInstanceOf[T]
- if (partialValue != null) {
- val bufferValue = buffer.getField(sumIndex).asInstanceOf[T]
- if (bufferValue != null) {
- buffer.setField(sumIndex, numeric.plus(partialValue, bufferValue))
- } else {
- buffer.setField(sumIndex, partialValue)
- }
- }
- }
-
- override def evaluate(buffer: Row): T = {
- buffer.getField(sumIndex).asInstanceOf[T]
- }
-
- override def prepare(value: Any, partial: Row): Unit = {
- if (value == null) {
- initiate(partial)
- } else {
- val input = value.asInstanceOf[T]
- partial.setField(sumIndex, input)
- }
- }
-
- override def supportPartial: Boolean = true
-
- override def setAggOffsetInRow(aggOffset: Int): Unit = {
- sumIndex = aggOffset
- }
-}
-
-class ByteSumAggregate extends SumAggregate[Byte] {
- override def intermediateDataType = Array(BasicTypeInfo.BYTE_TYPE_INFO)
-}
-
-class ShortSumAggregate extends SumAggregate[Short] {
- override def intermediateDataType = Array(BasicTypeInfo.SHORT_TYPE_INFO)
-}
-
-class IntSumAggregate extends SumAggregate[Int] {
- override def intermediateDataType = Array(BasicTypeInfo.INT_TYPE_INFO)
-}
-
-class LongSumAggregate extends SumAggregate[Long] {
- override def intermediateDataType = Array(BasicTypeInfo.LONG_TYPE_INFO)
-}
-
-class FloatSumAggregate extends SumAggregate[Float] {
- override def intermediateDataType = Array(BasicTypeInfo.FLOAT_TYPE_INFO)
-}
-
-class DoubleSumAggregate extends SumAggregate[Double] {
- override def intermediateDataType = Array(BasicTypeInfo.DOUBLE_TYPE_INFO)
-}
-
-class DecimalSumAggregate extends Aggregate[BigDecimal] {
-
- protected var sumIndex: Int = _
-
- override def intermediateDataType = Array(BasicTypeInfo.BIG_DEC_TYPE_INFO)
-
- override def initiate(partial: Row): Unit = {
- partial.setField(sumIndex, null)
- }
-
- override def merge(partial1: Row, buffer: Row): Unit = {
- val partialValue = partial1.getField(sumIndex).asInstanceOf[BigDecimal]
- if (partialValue != null) {
- val bufferValue = buffer.getField(sumIndex).asInstanceOf[BigDecimal]
- if (bufferValue != null) {
- buffer.setField(sumIndex, partialValue.add(bufferValue))
- } else {
- buffer.setField(sumIndex, partialValue)
- }
- }
- }
-
- override def evaluate(buffer: Row): BigDecimal = {
- buffer.getField(sumIndex).asInstanceOf[BigDecimal]
- }
-
- override def prepare(value: Any, partial: Row): Unit = {
- if (value == null) {
- initiate(partial)
- } else {
- val input = value.asInstanceOf[BigDecimal]
- partial.setField(sumIndex, input)
- }
- }
-
- override def supportPartial: Boolean = true
-
- override def setAggOffsetInRow(aggOffset: Int): Unit = {
- sumIndex = aggOffset
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala
deleted file mode 100644
index 417c1f1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/TimeWindowPropertyCollector.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.aggregate
-
-import org.apache.calcite.runtime.SqlFunctions
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.util.Collector
-
-/**
- * Adds TimeWindow properties to specified fields of a row before it emits the row to a wrapped
- * collector.
- */
-class TimeWindowPropertyCollector(windowStartOffset: Option[Int], windowEndOffset: Option[Int])
- extends Collector[Row] {
-
- var wrappedCollector: Collector[Row] = _
- var timeWindow: TimeWindow = _
-
- override def collect(record: Row): Unit = {
-
- val lastFieldPos = record.getArity - 1
-
- if (windowStartOffset.isDefined) {
- record.setField(
- lastFieldPos + windowStartOffset.get,
- SqlFunctions.internalToTimestamp(timeWindow.getStart))
- }
- if (windowEndOffset.isDefined) {
- record.setField(
- lastFieldPos + windowEndOffset.get,
- SqlFunctions.internalToTimestamp(timeWindow.getEnd))
- }
- wrappedCollector.collect(record)
- }
-
- override def close(): Unit = wrappedCollector.close()
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
deleted file mode 100644
index 2a4be46..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime.io
-
-import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.codegen.Compiler
-import org.apache.flink.core.io.GenericInputSplit
-import org.slf4j.LoggerFactory
-
-class ValuesInputFormat[OUT](
- name: String,
- code: String,
- @transient returnType: TypeInformation[OUT])
- extends GenericInputFormat[OUT]
- with NonParallelInput
- with ResultTypeQueryable[OUT]
- with Compiler[GenericInputFormat[OUT]] {
-
- val LOG = LoggerFactory.getLogger(this.getClass)
-
- private var format: GenericInputFormat[OUT] = _
-
- override def open(split: GenericInputSplit): Unit = {
- LOG.debug(s"Compiling GenericInputFormat: $name \n\n Code:\n$code")
- val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
- LOG.debug("Instantiating GenericInputFormat.")
- format = clazz.newInstance()
- }
-
- override def reachedEnd(): Boolean = format.reachedEnd()
-
- override def nextRecord(reuse: OUT): OUT = format.nextRecord(reuse)
-
- override def getProducedType: TypeInformation[OUT] = returnType
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala
deleted file mode 100644
index 27dbe8e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/BatchTableSink.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sinks
-
-import org.apache.flink.api.java.DataSet
-
-/** Defines an external [[TableSink]] to emit a batch [[org.apache.flink.api.table.Table]].
- *
- * @tparam T Type of [[DataSet]] that this [[TableSink]] expects and supports.
- */
-trait BatchTableSink[T] extends TableSink[T] {
-
- /** Emits the DataSet. */
- def emitDataSet(dataSet: DataSet[T]): Unit
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
deleted file mode 100644
index 5038d9b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/CsvTableSink.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sinks
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/**
- * A simple [[TableSink]] to emit data as CSV files.
- *
- * @param path The output path to write the Table to.
- * @param fieldDelim The field delimiter, ',' by default.
- */
-class CsvTableSink(
- path: String,
- fieldDelim: String = ",")
- extends TableSinkBase[Row] with BatchTableSink[Row] with StreamTableSink[Row] {
-
- override def emitDataSet(dataSet: DataSet[Row]): Unit = {
- dataSet
- .map(new CsvFormatter(fieldDelim))
- .writeAsText(path)
- }
-
- override def emitDataStream(dataStream: DataStream[Row]): Unit = {
- dataStream
- .map(new CsvFormatter(fieldDelim))
- .writeAsText(path)
- }
-
- override protected def copy: TableSinkBase[Row] = {
- new CsvTableSink(path, fieldDelim)
- }
-
- override def getOutputType: TypeInformation[Row] = {
- new RowTypeInfo(getFieldTypes: _*)
- }
-}
-
-/**
- * Formats a [[Row]] into a [[String]] with fields separated by the field delimiter.
- *
- * @param fieldDelim The field delimiter.
- */
-class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
- override def map(row: Row): String = {
-
- val builder = new StringBuilder
-
- // write first value
- val v = row.getField(0)
- if (v != null) {
- builder.append(v.toString)
- }
-
- // write following values
- for (i <- 1 until row.getArity) {
- builder.append(fieldDelim)
- val v = row.getField(i)
- if (v != null) {
- builder.append(v.toString)
- }
- }
- builder.mkString
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala
deleted file mode 100644
index 61ef3b2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/StreamTableSink.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sinks
-
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/** Defines an external [[TableSink]] to emit a batch [[org.apache.flink.api.table.Table]].
- *
- * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports.
- */
-trait StreamTableSink[T] extends TableSink[T] {
-
- /** Emits the DataStream. */
- def emitDataStream(dataStream: DataStream[T]): Unit
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
deleted file mode 100644
index 3dfc6f1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSink.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sinks
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-/** A [[TableSink]] specifies how to emit a [[org.apache.flink.api.table.Table]] to an external
- * system or location.
- *
- * The interface is generic such that it can support different storage locations and formats.
- *
- * @tparam T The return type of the [[TableSink]].
- */
-trait TableSink[T] {
-
- /**
- * Return the type expected by this [[TableSink]].
- *
- * This type should depend on the types returned by [[getFieldNames]].
- *
- * @return The type expected by this [[TableSink]].
- */
- def getOutputType: TypeInformation[T]
-
- /** Returns the names of the table fields. */
- def getFieldNames: Array[String]
-
- /** Returns the types of the table fields. */
- def getFieldTypes: Array[TypeInformation[_]]
-
- /**
- * Return a copy of this [[TableSink]] configured with the field names and types of the
- * [[org.apache.flink.api.table.Table]] to emit.
- *
- * @param fieldNames The field names of the table to emit.
- * @param fieldTypes The field types of the table to emit.
- * @return A copy of this [[TableSink]] configured with the field names and types of the
- * [[org.apache.flink.api.table.Table]] to emit.
- */
- def configure(fieldNames: Array[String],
- fieldTypes: Array[TypeInformation[_]]): TableSink[T]
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala
deleted file mode 100644
index 612ee0a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sinks/TableSinkBase.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.sinks
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-trait TableSinkBase[T] extends TableSink[T] {
-
- private var fieldNames: Option[Array[String]] = None
- private var fieldTypes: Option[Array[TypeInformation[_]]] = None
-
- /** Return a deep copy of the [[TableSink]]. */
- protected def copy: TableSinkBase[T]
-
- /**
- * Return the field names of the [[org.apache.flink.api.table.Table]] to emit. */
- def getFieldNames: Array[String] = {
- fieldNames match {
- case Some(n) => n
- case None => throw new IllegalStateException(
- "TableSink must be configured to retrieve field names.")
- }
- }
-
- /** Return the field types of the [[org.apache.flink.api.table.Table]] to emit. */
- def getFieldTypes: Array[TypeInformation[_]] = {
- fieldTypes match {
- case Some(t) => t
- case None => throw new IllegalStateException(
- "TableSink must be configured to retrieve field types.")
- }
- }
-
- /**
- * Return a copy of this [[TableSink]] configured with the field names and types of the
- * [[org.apache.flink.api.table.Table]] to emit.
- *
- * @param fieldNames The field names of the table to emit.
- * @param fieldTypes The field types of the table to emit.
- * @return A copy of this [[TableSink]] configured with the field names and types of the
- * [[org.apache.flink.api.table.Table]] to emit.
- */
- final def configure(fieldNames: Array[String],
- fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {
-
- val configuredSink = this.copy
- configuredSink.fieldNames = Some(fieldNames)
- configuredSink.fieldTypes = Some(fieldTypes)
-
- configuredSink
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
deleted file mode 100644
index 74e4cd6..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/BatchTableSource.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sources
-
-import org.apache.flink.api.java.{ExecutionEnvironment, DataSet}
-
-/** Defines an external batch table and provides access to its data.
- *
- * @tparam T Type of the [[DataSet]] created by this [[TableSource]].
- */
-trait BatchTableSource[T] extends TableSource[T] {
-
- /**
- * Returns the data of the table as a [[DataSet]].
- *
- * NOTE: This method is for internal use only for defining a [[TableSource]].
- * Do not use it in Table API programs.
- */
- def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
deleted file mode 100644
index b60575a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sources
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.io.CsvInputFormat
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.table.TableException
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.io.RowCsvInputFormat
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.core.fs.Path
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-
-/**
- * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
- * (logically) unlimited number of fields.
- *
- * @param path The path to the CSV file.
- * @param fieldNames The names of the table fields.
- * @param fieldTypes The types of the table fields.
- * @param fieldDelim The field delimiter, "," by default.
- * @param rowDelim The row delimiter, "\n" by default.
- * @param quoteCharacter An optional quote character for String values, null by default.
- * @param ignoreFirstLine Flag to ignore the first line, false by default.
- * @param ignoreComments An optional prefix to indicate comments, null by default.
- * @param lenient Flag to skip records with parse error instead to fail, false by default.
- */
-class CsvTableSource(
- path: String,
- fieldNames: Array[String],
- fieldTypes: Array[TypeInformation[_]],
- fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
- rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
- quoteCharacter: Character = null,
- ignoreFirstLine: Boolean = false,
- ignoreComments: String = null,
- lenient: Boolean = false)
- extends BatchTableSource[Row]
- with StreamTableSource[Row] {
-
- /**
- * A [[BatchTableSource]] and [[StreamTableSource]] for simple CSV files with a
- * (logically) unlimited number of fields.
- *
- * @param path The path to the CSV file.
- * @param fieldNames The names of the table fields.
- * @param fieldTypes The types of the table fields.
- */
- def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) =
- this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER,
- CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false)
-
- if (fieldNames.length != fieldTypes.length) {
- throw TableException("Number of field names and field types must be equal.")
- }
-
- private val returnType = new RowTypeInfo(fieldTypes: _*)
-
- /**
- * Returns the data of the table as a [[DataSet]] of [[Row]].
- *
- * NOTE: This method is for internal use only for defining a [[TableSource]].
- * Do not use it in Table API programs.
- */
- override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
- execEnv.createInput(createCsvInput(), returnType)
- }
-
- /** Returns the types of the table fields. */
- override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
- /** Returns the names of the table fields. */
- override def getFieldsNames: Array[String] = fieldNames
-
- /** Returns the number of fields of the table. */
- override def getNumberOfFields: Int = fieldNames.length
-
- /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */
- override def getReturnType: RowTypeInfo = returnType
-
- /**
- * Returns the data of the table as a [[DataStream]] of [[Row]].
- *
- * NOTE: This method is for internal use only for defining a [[TableSource]].
- * Do not use it in Table API programs.
- */
- override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = {
- streamExecEnv.createInput(createCsvInput(), returnType)
- }
-
- private def createCsvInput(): RowCsvInputFormat = {
- val inputFormat = new RowCsvInputFormat(new Path(path), returnType, rowDelim, fieldDelim)
-
- inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
- inputFormat.setLenient(lenient)
- if (quoteCharacter != null) {
- inputFormat.enableQuotedStringParsing(quoteCharacter)
- }
- if (ignoreComments != null) {
- inputFormat.setCommentPrefix(ignoreComments)
- }
-
- inputFormat
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala
deleted file mode 100644
index c04138a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/ProjectableTableSource.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sources
-
-/**
- * Adds support for projection push-down to a [[TableSource]].
- * A [[TableSource]] extending this interface is able to project the fields of the return table.
- *
- * @tparam T The return type of the [[ProjectableTableSource]].
- */
-trait ProjectableTableSource[T] {
-
- /**
- * Creates a copy of the [[ProjectableTableSource]] that projects its output on the specified
- * fields.
- *
- * @param fields The indexes of the fields to return.
- * @return A copy of the [[ProjectableTableSource]] that projects its output.
- */
- def projectFields(fields: Array[Int]): ProjectableTableSource[T]
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
deleted file mode 100644
index cdae0b3..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/StreamTableSource.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sources
-
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-
-/** Defines an external stream table and provides access to its data.
- *
- * @tparam T Type of the [[DataStream]] created by this [[TableSource]].
- */
-trait StreamTableSource[T] extends TableSource[T] {
-
- /**
- * Returns the data of the table as a [[DataStream]].
- *
- * NOTE: This method is for internal use only for defining a [[TableSource]].
- * Do not use it in Table API programs.
- */
- def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/TableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/TableSource.scala
deleted file mode 100644
index e1ada62..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/TableSource.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.sources
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-/** Defines an external table by providing schema information, i.e., field names and types.
- *
- * @tparam T The return type of the [[TableSource]].
- */
-trait TableSource[T] {
-
- /** Returns the number of fields of the table. */
- def getNumberOfFields: Int
-
- /** Returns the names of the table fields. */
- def getFieldsNames: Array[String]
-
- /** Returns the types of the table fields. */
- def getFieldTypes: Array[TypeInformation[_]]
-
- /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */
- def getReturnType: TypeInformation[T]
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
deleted file mode 100644
index 94c8e8c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ /dev/null
@@ -1,922 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.plan.ProjectionTranslator._
-import org.apache.flink.api.table.plan.logical.{Minus, _}
-import org.apache.flink.api.table.sinks.TableSink
-
-import scala.collection.JavaConverters._
-
-/**
- * A Table is the core component of the Table API.
- * Similar to how the batch and streaming APIs have DataSet and DataStream,
- * the Table API is built around [[Table]].
- *
- * Use the methods of [[Table]] to transform data. Use [[TableEnvironment]] to convert a [[Table]]
- * back to a DataSet or DataStream.
- *
- * When using Scala a [[Table]] can also be converted using implicit conversions.
- *
- * Example:
- *
- * {{{
- * val env = ExecutionEnvironment.getExecutionEnvironment
- * val tEnv = TableEnvironment.getTableEnvironment(env)
- *
- * val set: DataSet[(String, Int)] = ...
- * val table = set.toTable(tEnv, 'a, 'b)
- * ...
- * val table2 = ...
- * val set2: DataSet[MyType] = table2.toDataSet[MyType]
- * }}}
- *
- * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either take arguments
- * in a Scala DSL or as an expression String. Please refer to the documentation for the expression
- * syntax.
- *
- * @param tableEnv The [[TableEnvironment]] to which the table is bound.
- * @param logicalPlan logical representation
- */
-class Table(
- private[flink] val tableEnv: TableEnvironment,
- private[flink] val logicalPlan: LogicalNode) {
-
- def relBuilder = tableEnv.getRelBuilder
-
- def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder)
-
- /**
- * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
- * can contain complex expressions and aggregations.
- *
- * Example:
- *
- * {{{
- * tab.select('key, 'value.avg + " The average" as 'average)
- * }}}
- */
- def select(fields: Expression*): Table = {
- val expandedFields = expandProjectList(fields, logicalPlan, tableEnv)
- val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, tableEnv)
- if (propNames.nonEmpty) {
- throw ValidationException("Window properties can only be used on windowed tables.")
- }
-
- if (aggNames.nonEmpty) {
- val projectsOnAgg = replaceAggregationsAndProperties(
- expandedFields, tableEnv, aggNames, propNames)
- val projectFields = extractFieldReferences(expandedFields)
-
- new Table(tableEnv,
- Project(projectsOnAgg,
- Aggregate(Nil, aggNames.map(a => Alias(a._1, a._2)).toSeq,
- Project(projectFields, logicalPlan).validate(tableEnv)
- ).validate(tableEnv)
- ).validate(tableEnv)
- )
- } else {
- new Table(tableEnv,
- Project(expandedFields.map(UnresolvedAlias), logicalPlan).validate(tableEnv))
- }
- }
-
- /**
- * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
- * can contain complex expressions and aggregations.
- *
- * Example:
- *
- * {{{
- * tab.select("key, value.avg + ' The average' as average")
- * }}}
- */
- def select(fields: String): Table = {
- val fieldExprs = ExpressionParser.parseExpressionList(fields)
- select(fieldExprs: _*)
- }
-
- /**
- * Renames the fields of the expression result. Use this to disambiguate fields before
- * joining to operations.
- *
- * Example:
- *
- * {{{
- * tab.as('a, 'b)
- * }}}
- */
- def as(fields: Expression*): Table = {
- new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv))
- }
-
- /**
- * Renames the fields of the expression result. Use this to disambiguate fields before
- * joining to operations.
- *
- * Example:
- *
- * {{{
- * tab.as("a, b")
- * }}}
- */
- def as(fields: String): Table = {
- val fieldExprs = ExpressionParser.parseExpressionList(fields)
- as(fieldExprs: _*)
- }
-
- /**
- * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
- * clause.
- *
- * Example:
- *
- * {{{
- * tab.filter('name === "Fred")
- * }}}
- */
- def filter(predicate: Expression): Table = {
- new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv))
- }
-
- /**
- * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
- * clause.
- *
- * Example:
- *
- * {{{
- * tab.filter("name = 'Fred'")
- * }}}
- */
- def filter(predicate: String): Table = {
- val predicateExpr = ExpressionParser.parseExpression(predicate)
- filter(predicateExpr)
- }
-
- /**
- * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
- * clause.
- *
- * Example:
- *
- * {{{
- * tab.where('name === "Fred")
- * }}}
- */
- def where(predicate: Expression): Table = {
- filter(predicate)
- }
-
- /**
- * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
- * clause.
- *
- * Example:
- *
- * {{{
- * tab.where("name = 'Fred'")
- * }}}
- */
- def where(predicate: String): Table = {
- filter(predicate)
- }
-
- /**
- * Groups the elements on some grouping keys. Use this before a selection with aggregations
- * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
- *
- * Example:
- *
- * {{{
- * tab.groupBy('key).select('key, 'value.avg)
- * }}}
- */
- def groupBy(fields: Expression*): GroupedTable = {
- new GroupedTable(this, fields)
- }
-
- /**
- * Groups the elements on some grouping keys. Use this before a selection with aggregations
- * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
- *
- * Example:
- *
- * {{{
- * tab.groupBy("key").select("key, value.avg")
- * }}}
- */
- def groupBy(fields: String): GroupedTable = {
- val fieldsExpr = ExpressionParser.parseExpressionList(fields)
- groupBy(fieldsExpr: _*)
- }
-
- /**
- * Removes duplicate values and returns only distinct (different) values.
- *
- * Example:
- *
- * {{{
- * tab.select("key, value").distinct()
- * }}}
- */
- def distinct(): Table = {
- new Table(tableEnv, Distinct(logicalPlan).validate(tableEnv))
- }
-
- /**
- * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
- * operations must not overlap, use [[as]] to rename fields if necessary. You can use
- * where and select clauses after a join to further specify the behaviour of the join.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]].
- *
- * Example:
- *
- * {{{
- * left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd)
- * }}}
- */
- def join(right: Table): Table = {
- join(right, None, JoinType.INNER)
- }
-
- /**
- * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
- * operations must not overlap, use [[as]] to rename fields if necessary.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]].
- *
- * Example:
- *
- * {{{
- * left.join(right, "a = b")
- * }}}
- */
- def join(right: Table, joinPredicate: String): Table = {
- join(right, joinPredicate, JoinType.INNER)
- }
-
- /**
- * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
- * operations must not overlap, use [[as]] to rename fields if necessary.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]].
- *
- * Example:
- *
- * {{{
- * left.join(right, 'a === 'b).select('a, 'b, 'd)
- * }}}
- */
- def join(right: Table, joinPredicate: Expression): Table = {
- join(right, Some(joinPredicate), JoinType.INNER)
- }
-
- /**
- * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined
- * operations must not overlap, use [[as]] to rename fields if necessary.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
- * have nullCheck enabled.
- *
- * Example:
- *
- * {{{
- * left.leftOuterJoin(right, "a = b").select('a, 'b, 'd)
- * }}}
- */
- def leftOuterJoin(right: Table, joinPredicate: String): Table = {
- join(right, joinPredicate, JoinType.LEFT_OUTER)
- }
-
- /**
- * Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined
- * operations must not overlap, use [[as]] to rename fields if necessary.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
- * have nullCheck enabled.
- *
- * Example:
- *
- * {{{
- * left.leftOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
- * }}}
- */
- def leftOuterJoin(right: Table, joinPredicate: Expression): Table = {
- join(right, Some(joinPredicate), JoinType.LEFT_OUTER)
- }
-
- /**
- * Joins two [[Table]]s. Similar to an SQL right outer join. The fields of the two joined
- * operations must not overlap, use [[as]] to rename fields if necessary.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
- * have nullCheck enabled.
- *
- * Example:
- *
- * {{{
- * left.rightOuterJoin(right, "a = b").select('a, 'b, 'd)
- * }}}
- */
- def rightOuterJoin(right: Table, joinPredicate: String): Table = {
- join(right, joinPredicate, JoinType.RIGHT_OUTER)
- }
-
- /**
- * Joins two [[Table]]s. Similar to an SQL right outer join. The fields of the two joined
- * operations must not overlap, use [[as]] to rename fields if necessary.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
- * have nullCheck enabled.
- *
- * Example:
- *
- * {{{
- * left.rightOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
- * }}}
- */
- def rightOuterJoin(right: Table, joinPredicate: Expression): Table = {
- join(right, Some(joinPredicate), JoinType.RIGHT_OUTER)
- }
-
- /**
- * Joins two [[Table]]s. Similar to an SQL full outer join. The fields of the two joined
- * operations must not overlap, use [[as]] to rename fields if necessary.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
- * have nullCheck enabled.
- *
- * Example:
- *
- * {{{
- * left.fullOuterJoin(right, "a = b").select('a, 'b, 'd)
- * }}}
- */
- def fullOuterJoin(right: Table, joinPredicate: String): Table = {
- join(right, joinPredicate, JoinType.FULL_OUTER)
- }
-
- /**
- * Joins two [[Table]]s. Similar to an SQL full outer join. The fields of the two joined
- * operations must not overlap, use [[as]] to rename fields if necessary.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]] and its [[TableConfig]] must
- * have nullCheck enabled.
- *
- * Example:
- *
- * {{{
- * left.fullOuterJoin(right, 'a === 'b).select('a, 'b, 'd)
- * }}}
- */
- def fullOuterJoin(right: Table, joinPredicate: Expression): Table = {
- join(right, Some(joinPredicate), JoinType.FULL_OUTER)
- }
-
- private def join(right: Table, joinPredicate: String, joinType: JoinType): Table = {
- val joinPredicateExpr = ExpressionParser.parseExpression(joinPredicate)
- join(right, Some(joinPredicateExpr), joinType)
- }
-
- private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
- }
- new Table(
- tableEnv,
- Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false)
- .validate(tableEnv))
- }
-
- /**
- * Minus of two [[Table]]s with duplicate records removed.
- * Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not
- * exist in the right table. Duplicate records in the left table are returned
- * exactly once, i.e., duplicates are removed. Both tables must have identical field types.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]].
- *
- * Example:
- *
- * {{{
- * left.minus(right)
- * }}}
- */
- def minus(right: Table): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException("Only tables from the same TableEnvironment can be " +
- "subtracted.")
- }
- new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = false)
- .validate(tableEnv))
- }
-
- /**
- * Minus of two [[Table]]s. Similar to an SQL EXCEPT ALL.
- * Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in
- * the right table. A record that is present n times in the left table and m times
- * in the right table is returned (n - m) times, i.e., as many duplicates as are present
- * in the right table are removed. Both tables must have identical field types.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]].
- *
- * Example:
- *
- * {{{
- * left.minusAll(right)
- * }}}
- */
- def minusAll(right: Table): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException("Only tables from the same TableEnvironment can be " +
- "subtracted.")
- }
- new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = true)
- .validate(tableEnv))
- }
-
- /**
- * Unions two [[Table]]s with duplicate records removed.
- * Similar to an SQL UNION. The fields of the two union operations must fully overlap.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]].
- *
- * Example:
- *
- * {{{
- * left.union(right)
- * }}}
- */
- def union(right: Table): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
- }
- new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
- }
-
- /**
- * Unions two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations
- * must fully overlap.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]].
- *
- * Example:
- *
- * {{{
- * left.unionAll(right)
- * }}}
- */
- def unionAll(right: Table): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
- }
- new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
- }
-
- /**
- * Intersects two [[Table]]s with duplicate records removed. Intersect returns records that
- * exist in both tables. If a record is present in one or both tables more than once, it is
- * returned just once, i.e., the resulting table has no duplicate records. Similar to an
- * SQL INTERSECT. The fields of the two intersect operations must fully overlap.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]].
- *
- * Example:
- *
- * {{{
- * left.intersect(right)
- * }}}
- */
- def intersect(right: Table): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException(
- "Only tables from the same TableEnvironment can be intersected.")
- }
- new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
- }
-
- /**
- * Intersects two [[Table]]s. IntersectAll returns records that exist in both tables.
- * If a record is present in both tables more than once, it is returned as many times as it
- * is present in both tables, i.e., the resulting table might have duplicate records. Similar
- * to an SQL INTERSECT ALL. The fields of the two intersect operations must fully overlap.
- *
- * Note: Both tables must be bound to the same [[TableEnvironment]].
- *
- * Example:
- *
- * {{{
- * left.intersectAll(right)
- * }}}
- */
- def intersectAll(right: Table): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException(
- "Only tables from the same TableEnvironment can be intersected.")
- }
- new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
- }
-
- /**
- * Sorts the given [[Table]]. Similar to SQL ORDER BY.
- * The resulting Table is globally sorted across all parallel partitions.
- *
- * Example:
- *
- * {{{
- * tab.orderBy('name.desc)
- * }}}
- */
- def orderBy(fields: Expression*): Table = {
- val order: Seq[Ordering] = fields.map {
- case o: Ordering => o
- case e => Asc(e)
- }
- new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))
- }
-
- /**
- * Sorts the given [[Table]]. Similar to SQL ORDER BY.
- * The resulting Table is sorted globally sorted across all parallel partitions.
- *
- * Example:
- *
- * {{{
- * tab.orderBy("name.desc")
- * }}}
- */
- def orderBy(fields: String): Table = {
- val parsedFields = ExpressionParser.parseExpressionList(fields)
- orderBy(parsedFields: _*)
- }
-
- /**
- * Limits a sorted result from an offset position.
- * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
- * thus must be preceded by it.
- *
- * Example:
- *
- * {{{
- * // returns unlimited number of records beginning with the 4th record
- * tab.orderBy('name.desc).limit(3)
- * }}}
- *
- * @param offset number of records to skip
- */
- def limit(offset: Int): Table = {
- new Table(tableEnv, Limit(offset = offset, child = logicalPlan).validate(tableEnv))
- }
-
- /**
- * Limits a sorted result to a specified number of records from an offset position.
- * Similar to a SQL LIMIT clause. Limit is technically part of the Order By operator and
- * thus must be preceded by it.
- *
- * Example:
- *
- * {{{
- * // returns 5 records beginning with the 4th record
- * tab.orderBy('name.desc).limit(3, 5)
- * }}}
- *
- * @param offset number of records to skip
- * @param fetch number of records to be returned
- */
- def limit(offset: Int, fetch: Int): Table = {
- new Table(tableEnv, Limit(offset, fetch, logicalPlan).validate(tableEnv))
- }
-
- /**
- * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
- * to an SQL cross join, but it works with a table function. It returns rows from the outer
- * table (table on the left of the operator) that produces matching values from the table
- * function (which is defined in the expression on the right side of the operator).
- *
- * Example:
- *
- * {{{
- * class MySplitUDTF extends TableFunction[String] {
- * def eval(str: String): Unit = {
- * str.split("#").foreach(collect)
- * }
- * }
- *
- * val split = new MySplitUDTF()
- * table.join(split('c) as ('s)).select('a,'b,'c,'s)
- * }}}
- */
- def join(udtf: Expression): Table = {
- joinUdtfInternal(udtf, JoinType.INNER)
- }
-
- /**
- * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
- * to an SQL cross join, but it works with a table function. It returns rows from the outer
- * table (table on the left of the operator) that produces matching values from the table
- * function (which is defined in the expression on the right side of the operator).
- *
- * Example:
- *
- * {{{
- * class MySplitUDTF extends TableFunction<String> {
- * public void eval(String str) {
- * str.split("#").forEach(this::collect);
- * }
- * }
- *
- * TableFunction<String> split = new MySplitUDTF();
- * tableEnv.registerFunction("split", split);
- *
- * table.join("split(c) as (s)").select("a, b, c, s");
- * }}}
- */
- def join(udtf: String): Table = {
- joinUdtfInternal(udtf, JoinType.INNER)
- }
-
- /**
- * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
- * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
- * the rows from the outer table (table on the left of the operator), and rows that do not match
- * the condition from the table function (which is defined in the expression on the right
- * side of the operator). Rows with no matching condition are filled with null values.
- *
- * Example:
- *
- * {{{
- * class MySplitUDTF extends TableFunction[String] {
- * def eval(str: String): Unit = {
- * str.split("#").foreach(collect)
- * }
- * }
- *
- * val split = new MySplitUDTF()
- * table.leftOuterJoin(split('c) as ('s)).select('a,'b,'c,'s)
- * }}}
- */
- def leftOuterJoin(udtf: Expression): Table = {
- joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
- }
-
- /**
- * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
- * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
- * the rows from the outer table (table on the left of the operator), and rows that do not match
- * the condition from the table function (which is defined in the expression on the right
- * side of the operator). Rows with no matching condition are filled with null values.
- *
- * Example:
- *
- * {{{
- * class MySplitUDTF extends TableFunction<String> {
- * public void eval(String str) {
- * str.split("#").forEach(this::collect);
- * }
- * }
- *
- * TableFunction<String> split = new MySplitUDTF();
- * tableEnv.registerFunction("split", split);
- *
- * table.leftOuterJoin("split(c) as (s)").select("a, b, c, s");
- * }}}
- */
- def leftOuterJoin(udtf: String): Table = {
- joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
- }
-
- private def joinUdtfInternal(udtfString: String, joinType: JoinType): Table = {
- val udtf = ExpressionParser.parseExpression(udtfString)
- joinUdtfInternal(udtf, joinType)
- }
-
- private def joinUdtfInternal(udtf: Expression, joinType: JoinType): Table = {
- var alias: Option[Seq[String]] = None
-
- // unwrap an Expression until we get a TableFunctionCall
- def unwrap(expr: Expression): TableFunctionCall = expr match {
- case Alias(child, name, extraNames) =>
- alias = Some(Seq(name) ++ extraNames)
- unwrap(child)
- case Call(name, args) =>
- val function = tableEnv.getFunctionCatalog.lookupFunction(name, args)
- unwrap(function)
- case c: TableFunctionCall => c
- case _ =>
- throw new TableException(
- "Cross/Outer Apply operators only accept expressions that define table functions.")
- }
-
- val call = unwrap(udtf)
- .as(alias)
- .toLogicalTableFunctionCall(this.logicalPlan)
- .validate(tableEnv)
-
- new Table(
- tableEnv,
- Join(this.logicalPlan, call, joinType, None, correlated = true).validate(tableEnv))
- }
-
- /**
- * Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location.
- *
- * A batch [[Table]] can only be written to a
- * [[org.apache.flink.api.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
- * [[org.apache.flink.api.table.sinks.StreamTableSink]].
- *
- * @param sink The [[TableSink]] to which the [[Table]] is written.
- * @tparam T The data type that the [[TableSink]] expects.
- */
- def writeToSink[T](sink: TableSink[T]): Unit = {
-
- // get schema information of table
- val rowType = getRelNode.getRowType
- val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
- val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
- .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
-
- // configure the table sink
- val configuredSink = sink.configure(fieldNames, fieldTypes)
-
- // emit the table to the configured table sink
- tableEnv.writeToSink(this, configuredSink)
- }
-
- /**
- * Groups the records of a table by assigning them to windows defined by a time or row interval.
- *
- * For streaming tables of infinite size, grouping into windows is required to define finite
- * groups on which group-based aggregates can be computed.
- *
- * For batch tables of finite size, windowing essentially provides shortcuts for time-based
- * groupBy.
- *
- * __Note__: window on non-grouped streaming table is a non-parallel operation, i.e., all data
- * will be processed by a single operator.
- *
- * @param groupWindow group-window that specifies how elements are grouped.
- * @return A windowed table.
- */
- def window(groupWindow: GroupWindow): GroupWindowedTable = {
- if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
- throw new ValidationException(s"Windows on batch tables are currently not supported.")
- }
- new GroupWindowedTable(this, Seq(), groupWindow)
- }
-}
-
-/**
- * A table that has been grouped on a set of grouping keys.
- */
-class GroupedTable(
- private[flink] val table: Table,
- private[flink] val groupKey: Seq[Expression]) {
-
- /**
- * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
- * The field expressions can contain complex expressions and aggregations.
- *
- * Example:
- *
- * {{{
- * tab.groupBy('key).select('key, 'value.avg + " The average" as 'average)
- * }}}
- */
- def select(fields: Expression*): Table = {
- val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv)
- if (propNames.nonEmpty) {
- throw ValidationException("Window properties can only be used on windowed tables.")
- }
-
- val projectsOnAgg = replaceAggregationsAndProperties(
- fields, table.tableEnv, aggNames, propNames)
- val projectFields = extractFieldReferences(fields ++ groupKey)
-
- new Table(table.tableEnv,
- Project(projectsOnAgg,
- Aggregate(groupKey, aggNames.map(a => Alias(a._1, a._2)).toSeq,
- Project(projectFields, table.logicalPlan).validate(table.tableEnv)
- ).validate(table.tableEnv)
- ).validate(table.tableEnv))
- }
-
- /**
- * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
- * The field expressions can contain complex expressions and aggregations.
- *
- * Example:
- *
- * {{{
- * tab.groupBy("key").select("key, value.avg + ' The average' as average")
- * }}}
- */
- def select(fields: String): Table = {
- val fieldExprs = ExpressionParser.parseExpressionList(fields)
- select(fieldExprs: _*)
- }
-
- /**
- * Groups the records of a table by assigning them to windows defined by a time or row interval.
- *
- * For streaming tables of infinite size, grouping into windows is required to define finite
- * groups on which group-based aggregates can be computed.
- *
- * For batch tables of finite size, windowing essentially provides shortcuts for time-based
- * groupBy.
- *
- * @param groupWindow group-window that specifies how elements are grouped.
- * @return A windowed table.
- */
- def window(groupWindow: GroupWindow): GroupWindowedTable = {
- if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) {
- throw new ValidationException(s"Windows on batch tables are currently not supported.")
- }
- new GroupWindowedTable(table, groupKey, groupWindow)
- }
-}
-
-class GroupWindowedTable(
- private[flink] val table: Table,
- private[flink] val groupKey: Seq[Expression],
- private[flink] val window: GroupWindow) {
-
- /**
- * Performs a selection operation on a windowed table. Similar to an SQL SELECT statement.
- * The field expressions can contain complex expressions and aggregations.
- *
- * Example:
- *
- * {{{
- * groupWindowTable.select('key, 'window.start, 'value.avg + " The average" as 'average)
- * }}}
- */
- def select(fields: Expression*): Table = {
- val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv)
- val projectsOnAgg = replaceAggregationsAndProperties(
- fields, table.tableEnv, aggNames, propNames)
-
- val projectFields = (table.tableEnv, window) match {
- // event time can be arbitrary field in batch environment
- case (_: BatchTableEnvironment, w: EventTimeWindow) =>
- extractFieldReferences(fields ++ groupKey ++ Seq(w.timeField))
- case (_, _) =>
- extractFieldReferences(fields ++ groupKey)
- }
-
- new Table(table.tableEnv,
- Project(
- projectsOnAgg,
- WindowAggregate(
- groupKey,
- window.toLogicalWindow,
- propNames.map(a => Alias(a._1, a._2)).toSeq,
- aggNames.map(a => Alias(a._1, a._2)).toSeq,
- Project(projectFields, table.logicalPlan).validate(table.tableEnv)
- ).validate(table.tableEnv)
- ).validate(table.tableEnv))
- }
-
- /**
- * Performs a selection operation on a group-windows table. Similar to an SQL SELECT statement.
- * The field expressions can contain complex expressions and aggregations.
- *
- * Example:
- *
- * {{{
- * groupWindowTable.select("key, window.start, value.avg + ' The average' as average")
- * }}}
- */
- def select(fields: String): Table = {
- val fieldExprs = ExpressionParser.parseExpressionList(fields)
- select(fieldExprs: _*)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
deleted file mode 100644
index 8eecc74..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.trees
-
-import org.apache.commons.lang.ClassUtils
-
-/**
- * Generic base class for trees that can be transformed and traversed.
- */
-abstract class TreeNode[A <: TreeNode[A]] extends Product { self: A =>
-
- /**
- * List of child nodes that should be considered when doing transformations. Other values
- * in the Product will not be transformed, only handed through.
- */
- private[flink] def children: Seq[A]
-
- /**
- * Tests for equality by first testing for reference equality.
- */
- private[flink] def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other
-
- /**
- * Do tree transformation in post order.
- */
- private[flink] def postOrderTransform(rule: PartialFunction[A, A]): A = {
- def childrenTransform(rule: PartialFunction[A, A]): A = {
- var changed = false
- val newArgs = productIterator.map {
- case arg: TreeNode[_] if children.contains(arg) =>
- val newChild = arg.asInstanceOf[A].postOrderTransform(rule)
- if (!(newChild fastEquals arg)) {
- changed = true
- newChild
- } else {
- arg
- }
- case args: Traversable[_] => args.map {
- case arg: TreeNode[_] if children.contains(arg) =>
- val newChild = arg.asInstanceOf[A].postOrderTransform(rule)
- if (!(newChild fastEquals arg)) {
- changed = true
- newChild
- } else {
- arg
- }
- case other => other
- }
- case nonChild: AnyRef => nonChild
- case null => null
- }.toArray
- if (changed) makeCopy(newArgs) else this
- }
-
- val afterChildren = childrenTransform(rule)
- if (afterChildren fastEquals this) {
- rule.applyOrElse(this, identity[A])
- } else {
- rule.applyOrElse(afterChildren, identity[A])
- }
- }
-
- /**
- * Runs the given function first on the node and then recursively on all its children.
- */
- private[flink] def preOrderVisit(f: A => Unit): Unit = {
- f(this)
- children.foreach(_.preOrderVisit(f))
- }
-
- /**
- * Creates a new copy of this expression with new children. This is used during transformation
- * if children change.
- */
- private[flink] def makeCopy(newArgs: Array[AnyRef]): A = {
- val ctors = getClass.getConstructors.filter(_.getParameterTypes.size > 0)
- if (ctors.isEmpty) {
- throw new RuntimeException(s"No valid constructor for ${getClass.getSimpleName}")
- }
-
- val defaultCtor = ctors.find { ctor =>
- if (ctor.getParameterTypes.size != newArgs.length) {
- false
- } else if (newArgs.contains(null)) {
- false
- } else {
- val argsClasses: Array[Class[_]] = newArgs.map(_.getClass)
- ClassUtils.isAssignable(argsClasses, ctor.getParameterTypes)
- }
- }.getOrElse(ctors.maxBy(_.getParameterTypes.size))
-
- try {
- defaultCtor.newInstance(newArgs: _*).asInstanceOf[A]
- } catch {
- case e: Throwable =>
- throw new RuntimeException(
- s"Fail to copy treeNode ${getClass.getName}: ${e.getStackTraceString}")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/InternalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/InternalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/InternalTypeInfo.scala
deleted file mode 100644
index 2f896ec..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/InternalTypeInfo.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import java.util.Objects
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
-import org.apache.flink.util.Preconditions._
-
-/**
- * TypeInformation for internal types of the Table API that are for translation purposes only
- * and should not be contained in final plan.
- */
-@SerialVersionUID(-13064574364925255L)
-abstract class InternalTypeInfo[T](val clazz: Class[T])
- extends TypeInformation[T]
- with AtomicType[T] {
-
- checkNotNull(clazz)
-
- override def isBasicType: Boolean =
- throw new UnsupportedOperationException("This type is for internal use only.")
-
- override def isTupleType: Boolean =
- throw new UnsupportedOperationException("This type is for internal use only.")
-
- override def getArity: Int =
- throw new UnsupportedOperationException("This type is for internal use only.")
-
- override def getTotalFields: Int =
- throw new UnsupportedOperationException("This type is for internal use only.")
-
- override def getTypeClass: Class[T] = clazz
-
- override def isKeyType: Boolean =
- throw new UnsupportedOperationException("This type is for internal use only.")
-
- override def createSerializer(config: ExecutionConfig): TypeSerializer[T] =
- throw new UnsupportedOperationException("This type is for internal use only.")
-
- override def createComparator(
- sortOrderAscending: Boolean,
- executionConfig: ExecutionConfig)
- : TypeComparator[T] =
- throw new UnsupportedOperationException("This type is for internal use only.")
-
- // ----------------------------------------------------------------------------------------------
-
- override def hashCode: Int = Objects.hash(clazz)
-
- def canEqual(obj: Any): Boolean
-
- override def equals(obj: Any): Boolean = {
- obj match {
- case other: InternalTypeInfo[_] =>
- other.canEqual(this) && (this.clazz eq other.clazz)
- case _ =>
- false
- }
- }
-
- override def toString: String = s"InternalTypeInfo"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
deleted file mode 100644
index 4dc83d0..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-/**
- * TypeInformation for row intervals.
- */
-@SerialVersionUID(-1306179424364925258L)
-class RowIntervalTypeInfo extends InternalTypeInfo[Long](classOf[Long]) {
-
- def canEqual(obj: Any): Boolean = obj.isInstanceOf[RowIntervalTypeInfo]
-
- override def toString: String = s"RowIntervalTypeInfo"
-}
-
-object RowIntervalTypeInfo {
-
- val INTERVAL_ROWS = new RowIntervalTypeInfo()
-
-}