You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2018/05/23 07:12:41 UTC
[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5688#discussion_r190140171
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+ * A QueryableTableSink stores table in queryable state.
+ *
+ * This class stores table in queryable state so that users can access table data without
+ * dependency on external storage.
+ *
+ * Since this is only a kv storage, currently user can only do point query against it.
+ *
+ * Example:
+ * {{{
+ * val env = ExecutionEnvironment.getExecutionEnvironment
+ * val tEnv = TableEnvironment.getTableEnvironment(env)
+ *
+ * val table: Table = ...
+ *
+ * val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+ * "prefix",
+ * queryConfig,
+ * None)
+ *
+ * tEnv.writeToSink(table, queryableTableSink, config)
+ * }}}
+ *
+ * When program starts to run, user can access state with QueryableStateClient.
+ * {{{
+ * val client = new QueryableStateClient(tmHostname, proxyPort)
+ * val data = client.getKvState(
+ * jobId,
+ * "prefix-column1",
+ * Row.of(1),
+ * new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id"))
+ * stateDescriptor)
+ * .get();
+ *
+ * }}}
+ *
+ *
+ * @param namePrefix
+ * @param queryConfig
+ * @param cleanupTimeDomain
+ */
+class QueryableTableSink(
+ private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig,
+ private val cleanupTimeDomain: Option[TimeDomain])
+ extends UpsertStreamTableSink[Row]
+ with TableSinkBase[JTuple2[JBool, Row]] {
+ private var keys: Array[String] = _
+
+ override def setKeyFields(keys: Array[String]): Unit = {
+ if (keys == null) {
+ throw new IllegalArgumentException("keys can't be null!")
+ }
+ this.keys = keys
+ }
+
+ override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+ if (isAppendOnly) {
+ throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " +
+ "tables as the table would grow infinitely")
+ }
+ }
+
+ override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
+
+ override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = {
+ val keyIndices = keys.map(getFieldNames.indexOf(_))
+ val keyTypes = keyIndices.map(getFieldTypes(_))
+
+ val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+ val processFunction = new QueryableStateProcessFunction(
+ namePrefix,
+ queryConfig,
+ keys,
+ getFieldNames,
+ getFieldTypes,
+ calculateCleanupTimeDomain(dataStream.getExecutionEnvironment.getStreamTimeCharacteristic))
+
+ dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+ .process(processFunction)
+ }
+
+ private def calculateCleanupTimeDomain(timeCharacteristic: TimeCharacteristic): TimeDomain = {
+ val timeDomainFromTimeCharacteristic = {
+ timeCharacteristic match {
+ case TimeCharacteristic.IngestionTime | TimeCharacteristic.ProcessingTime =>
+ TimeDomain.PROCESSING_TIME
+ case TimeCharacteristic.EventTime =>
+ TimeDomain.EVENT_TIME
+ }
+ }
+
+ cleanupTimeDomain.getOrElse(timeDomainFromTimeCharacteristic)
+ }
+
+ override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
+ new QueryableTableSink(this.namePrefix, this.queryConfig, this.cleanupTimeDomain)
+ }
+}
+
+class RowKeySelector(
+ private val keyIndices: Array[Int],
+ @transient private val returnType: TypeInformation[Row])
+ extends KeySelector[JTuple2[JBool, Row], Row]
+ with ResultTypeQueryable[Row] {
+
+ override def getKey(value: JTuple2[JBool, Row]): Row = {
+ val keys = keyIndices
+
+ val srcRow = value.f1
+
+ val destRow = new Row(keys.length)
+ var i = 0
+ while (i < keys.length) {
+ destRow.setField(i, srcRow.getField(keys(i)))
+ i += 1
+ }
+
+ destRow
+ }
+
+ override def getProducedType: TypeInformation[Row] = returnType
+}
+
+class QueryableStateProcessFunction(
+ private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig,
+ private val keyNames: Array[String],
+ private val fieldNames: Array[String],
+ private val fieldTypes: Array[TypeInformation[_]],
+ private val cleanupTimeDomain: TimeDomain)
+ extends KeyedProcessFunctionWithCleanupState[Row, JTuple2[JBool, Row], Void](queryConfig) {
+
+ @transient private var states = Array[ValueState[AnyRef]]()
--- End diff --
Use default initializer `_`.
---