You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by liurenjie1024 <gi...@git.apache.org> on 2018/03/13 10:52:57 UTC

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

GitHub user liurenjie1024 opened a pull request:

    https://github.com/apache/flink/pull/5688

    [FLINK-6968][Table API & SQL] Add Queryable table sink.

    ## What is the purpose of the change
    
    Streaming tables with unique key are continuously updated. For example queries with a non-windowed aggregation generate such tables. Commonly, such updating tables are emitted via an upsert table sink to an external datastore (k-v store, database) to make it accessible to applications.
    
    ## Brief change log
    
      - *Add a QueryableStateTableSink.*
      - *States are queryable.*
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
      - *Added test that validates that states will be stored.*
      - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and use QueryableStateClient to test that.*
    
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? docs


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/liurenjie1024/flink QueryableTableSink

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5688.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5688
    
----
commit 0663e550216dfef8bf205f90d5ac8a0e7e77a42b
Author: liurenjie1024 <li...@...>
Date:   2018-03-12T07:32:44Z

    Code complete

commit 6d62d53f0bae65249ab69bddf7932e62ae1e7897
Author: liurenjie1024 <li...@...>
Date:   2018-03-13T09:44:30Z

    Add test

commit c9ffa6ecdd638a497b60f3f063b2d352b1b98059
Author: liurenjie1024 <li...@...>
Date:   2018-03-13T10:43:19Z

    Fix test style

----


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r190141671
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.stream.table
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.common.time.Time
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.TimeDomain
    +import org.apache.flink.streaming.api.operators.KeyedProcessOperator
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.harness.HarnessTestBase
    +import org.apache.flink.table.sinks.{QueryableStateProcessFunction, RowKeySelector}
    +import org.apache.flink.types.Row
    +import org.junit.Assert.assertEquals
    +import org.junit.Test
    +
    +class QueryableTableSinkTest extends HarnessTestBase {
    +  @Test
    +  def testRowSelector(): Unit = {
    +    val keyTypes = Array(TypeInformation.of(classOf[List[Int]]),
    +      TypeInformation.of(classOf[String]))
    +    val selector = new RowKeySelector(Array(0, 2), new RowTypeInfo(keyTypes:_*))
    +
    +    val src = Row.of(List(1), "a", "b")
    +    val key = selector.getKey(JTuple2.of(true, src))
    +
    +    assertEquals(Row.of(List(1), "b"), key)
    +  }
    +
    +  @Test
    +  def testProcessFunction(): Unit = {
    +    val queryConfig = new StreamQueryConfig()
    +      .withIdleStateRetentionTime(Time.milliseconds(2), Time.milliseconds(10))
    +
    +    val keys = Array("id")
    +    val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
    +    val fieldNames = Array("id", "is_manager", "name")
    +    val fieldTypes: Array[TypeInformation[_]] = Array(
    +      TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
    +      TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
    +      TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
    +    val func = new QueryableStateProcessFunction(
    +      "test",
    +      queryConfig,
    +      keys,
    +      fieldNames,
    +      fieldTypes,
    +      TimeDomain.PROCESSING_TIME)
    +
    +    val operator = new KeyedProcessOperator[Row, JTuple2[JBool, Row], Void](func)
    +
    +    val testHarness = createHarnessTester(operator,
    +      new RowKeySelector(Array(0), keyType),
    +      keyType)
    +
    +    testHarness.open()
    +
    +    val stateDesc1 = new ValueStateDescriptor[JBool]("is_manager",
    +      TypeInformation.of(classOf[JBool]))
    +    stateDesc1.initializeSerializerUnlessSet(operator.getExecutionConfig)
    +    val stateDesc2 = new ValueStateDescriptor[String]("name", TypeInformation.of(classOf[String]))
    +    stateDesc2.initializeSerializerUnlessSet(operator.getExecutionConfig)
    +    val key1 = Row.of("1")
    +    val key2 = Row.of("2")
    +
    +    testHarness.processElement(JTuple2.of(true, Row.of("1", JBool.valueOf(true), "jeff")), 2)
    +    testHarness.processElement(JTuple2.of(true, Row.of("2", JBool.valueOf(false), "dean")), 6)
    +
    +    val stateOf = (key: Row, sd: ValueStateDescriptor[_]) => {
    +      testHarness.getState(key, sd).value().asInstanceOf[AnyRef]
    +    }
    +
    +    var expectedData = Array(
    +      Row.of(JBool.valueOf(true), "jeff"),
    +      Row.of(JBool.valueOf(false), "dean"))
    +    var storedData = Array(
    +      Row.of(stateOf(key1, stateDesc1), stateOf(key1, stateDesc2)),
    +      Row.of(stateOf(key2, stateDesc1), stateOf(key2, stateDesc2)))
    +
    +    verify(expectedData, storedData)
    +
    +
    --- End diff --
    
    Add comments to this test and remove empty lines here.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r174339810
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.stream.table
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.common.time.Time
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.harness.HarnessTestBase
    +import org.apache.flink.table.sinks.{QueryableStateProcessFunction, RowKeySelector}
    +import org.apache.flink.types.Row
    +import org.junit.Assert.assertEquals
    +import org.junit.Test
    +
    +class QueryableTableSinkTest extends HarnessTestBase {
    +  @Test
    +  def testRowSelector(): Unit = {
    +    val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), TypeInformation.of(classOf[String]))
    +    val selector = new RowKeySelector(Array(0, 2), new RowTypeInfo(keyTypes:_*))
    +
    +    val src = Row.of(List(1), "a", "b")
    +    val key = selector.getKey(JTuple2.of(true, src))
    +
    +    assertEquals(Row.of(List(1), "b"), key)
    +  }
    +
    +  @Test
    +  def testProcessFunction(): Unit = {
    +    val queryConfig = new StreamQueryConfig()
    +      .withIdleStateRetentionTime(Time.milliseconds(2), Time.milliseconds(10))
    +
    +    val keys = Array("id")
    +    val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
    +    val fieldNames = Array("id", "is_manager", "name")
    +    val fieldTypes: Array[TypeInformation[_]] = Array(
    +      TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
    +      TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
    +      TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
    +    val func = new QueryableStateProcessFunction("test", queryConfig, keys, fieldNames, fieldTypes)
    +
    +    val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, Row], Void](func)
    +
    +    val testHarness = createHarnessTester(operator,
    +      new RowKeySelector(Array(0), keyType),
    +      keyType)
    +
    +    testHarness.open()
    +
    +
    --- End diff --
    
    Remove extra blank lines.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r174338993
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,175 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    --- End diff --
    
    Remove unused imports.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r190141550
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---
    @@ -78,6 +82,20 @@ public int numKeyedStateEntries() {
     		}
     	}
     
    +	public <S extends State> S getState(K key, StateDescriptor<S, ?> stateDesc) throws Exception {
    --- End diff --
    
    Is this change necessary? We should only modify code outside of `flink-table` if it is urgently needed.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r175315979
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +class QueryableTableSink(
    --- End diff --
    
    Besides @xccui 's suggestion in adding formal docs. Could you also add some java doc style comment here explaining what this table sink does and how this sink is intended to be used? 


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r175680499
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +class QueryableTableSink(
    +    private val namePrefix: String,
    +    private val queryConfig: StreamQueryConfig)
    +  extends UpsertStreamTableSink[Row]
    +    with TableSinkBase[JTuple2[JBool, Row]] {
    +  private var keys: Array[String] = _
    +
    +  override def setKeyFields(keys: Array[String]): Unit = {
    +    if (keys == null) {
    +      throw new IllegalArgumentException("keys can't be null!")
    +    }
    +    this.keys = keys
    +  }
    +
    +  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
    +    if (isAppendOnly) {
    +      throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " +
    +        "tables as the table would grow infinitely")
    +    }
    +  }
    +
    +  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
    +
    +  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = {
    +    val keyIndices = keys.map(getFieldNames.indexOf(_))
    +    val keyTypes = keyIndices.map(getFieldTypes(_))
    +
    +    val keySelectorType = new RowTypeInfo(keyTypes, keys)
    +
    +    val processFunction = new QueryableStateProcessFunction(
    +      namePrefix,
    +      queryConfig,
    +      keys,
    +      getFieldNames,
    +      getFieldTypes)
    +
    +    dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
    +      .process(processFunction)
    +  }
    +
    +  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
    +    new QueryableTableSink(this.namePrefix, this.queryConfig)
    +  }
    +}
    +
    +class RowKeySelector(
    +  private val keyIndices: Array[Int],
    +  @transient private val returnType: TypeInformation[Row])
    +  extends KeySelector[JTuple2[JBool, Row], Row]
    +    with ResultTypeQueryable[Row] {
    +
    +  override def getKey(value: JTuple2[JBool, Row]): Row = {
    +    val keys = keyIndices
    +
    +    val srcRow = value.f1
    +
    +    val destRow = new Row(keys.length)
    +    var i = 0
    +    while (i < keys.length) {
    +      destRow.setField(i, srcRow.getField(keys(i)))
    +      i += 1
    +    }
    +
    +    destRow
    +  }
    +
    +  override def getProducedType: TypeInformation[Row] = returnType
    +}
    +
    +class QueryableStateProcessFunction(
    +  private val namePrefix: String,
    +  private val queryConfig: StreamQueryConfig,
    +  private val keyNames: Array[String],
    +  private val fieldNames: Array[String],
    +  private val fieldTypes: Array[TypeInformation[_]])
    +  extends ProcessFunctionWithCleanupState[JTuple2[JBool, Row], Void](queryConfig) {
    +
    +  @transient private var states = Array[ValueState[AnyRef]]()
    +  @transient private var nonKeyIndices = Array[Int]()
    +
    +  override def open(parameters: Configuration): Unit = {
    +    super.open(parameters)
    +
    +    nonKeyIndices = fieldNames.indices
    +      .filter(idx => !keyNames.contains(fieldNames(idx)))
    +      .toArray
    +
    +    val statesBuilder = Array.newBuilder[ValueState[AnyRef]]
    +
    +    for (i <- nonKeyIndices) {
    +      val stateDesc = new ValueStateDescriptor(fieldNames(i), fieldTypes(i))
    +      stateDesc.initializeSerializerUnlessSet(getRuntimeContext.getExecutionConfig)
    +      stateDesc.setQueryable(fieldNames(i))
    +      statesBuilder += getRuntimeContext.getState(stateDesc).asInstanceOf[ValueState[AnyRef]]
    +    }
    +
    +    states = statesBuilder.result()
    +
    +    initCleanupTimeState("QueryableStateCleanupTime")
    +  }
    +
    +  override def processElement(
    +    value: JTuple2[JBool, Row],
    +    ctx: ProcessFunction[JTuple2[JBool, Row], Void]#Context,
    +    out: Collector[Void]): Unit = {
    +    if (value.f0) {
    +      for (i <- nonKeyIndices.indices) {
    +        states(i).update(value.f1.getField(nonKeyIndices(i)))
    +      }
    +
    +      val currentTime = ctx.timerService().currentProcessingTime()
    --- End diff --
    
    I think it's right to be able to process both time domain, and I've added.


---

[GitHub] flink issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink.

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on the issue:

    https://github.com/apache/flink/pull/5688
  
    Could you please rebase your pr to resolve conflict? Thanks.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r174366242
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,175 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +class QueryableTableSink(private val namePrefix: String,
    +                         private val queryConfig: StreamQueryConfig)
    +  extends UpsertStreamTableSink[Row]
    +  with TableSinkBase[JTuple2[JBool, Row]] {
    +  private var keys: Array[String] = _
    +
    +  /**
    +    * Configures the unique key fields of the [[Table]] to write.
    +    * The method is called after [[TableSink.configure()]].
    +    *
    +    * The keys array might be empty, if the table consists of a single (updated) record.
    +    * If the table does not have a key and is append-only, the keys attribute is null.
    +    *
    +    * @param keys the field names of the table's keys, an empty array if the table has a single
    +    *             row, and null if the table is append-only and has no key.
    +    */
    +  override def setKeyFields(keys: Array[String]): Unit = {
    +    if (keys == null) {
    +      throw new IllegalArgumentException("keys can't be null!")
    +    }
    +    this.keys = keys
    +  }
    +
    +  /**
    +    * Specifies whether the [[Table]] to write is append-only or not.
    +    *
    +    * @param isAppendOnly true if the table is append-only, false otherwise.
    +    */
    +  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
    +    if (isAppendOnly) {
    +      throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only tables " +
    +        "as the table would grow infinitely")
    +    }
    +  }
    +
    +  /** Returns the requested record type */
    +  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
    +
    +  /** Emits the DataStream. */
    +  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = {
    +    val keyIndices = keys.map(getFieldNames.indexOf(_))
    +    val keyTypes = keyIndices.map(getFieldTypes(_))
    +
    +    val keySelectorType = new RowTypeInfo(keyTypes, keys)
    +
    +    val processFunction = new QueryableStateProcessFunction(
    +      namePrefix,
    +      queryConfig,
    +      keys,
    +      getFieldNames,
    +      getFieldTypes)
    +
    +    dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
    +      .process(processFunction)
    --- End diff --
    
    I'm waiting another [PR](https://github.com/apache/flink/pull/5680) to be merged.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r174338947
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,175 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +class QueryableTableSink(private val namePrefix: String,
    +                         private val queryConfig: StreamQueryConfig)
    +  extends UpsertStreamTableSink[Row]
    +  with TableSinkBase[JTuple2[JBool, Row]] {
    +  private var keys: Array[String] = _
    +
    +  /**
    +    * Configures the unique key fields of the [[Table]] to write.
    +    * The method is called after [[TableSink.configure()]].
    +    *
    +    * The keys array might be empty, if the table consists of a single (updated) record.
    +    * If the table does not have a key and is append-only, the keys attribute is null.
    +    *
    +    * @param keys the field names of the table's keys, an empty array if the table has a single
    +    *             row, and null if the table is append-only and has no key.
    +    */
    +  override def setKeyFields(keys: Array[String]): Unit = {
    +    if (keys == null) {
    +      throw new IllegalArgumentException("keys can't be null!")
    +    }
    +    this.keys = keys
    +  }
    +
    +  /**
    +    * Specifies whether the [[Table]] to write is append-only or not.
    +    *
    +    * @param isAppendOnly true if the table is append-only, false otherwise.
    +    */
    +  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
    +    if (isAppendOnly) {
    +      throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only tables " +
    +        "as the table would grow infinitely")
    +    }
    +  }
    +
    +  /** Returns the requested record type */
    +  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
    +
    +  /** Emits the DataStream. */
    +  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = {
    +    val keyIndices = keys.map(getFieldNames.indexOf(_))
    +    val keyTypes = keyIndices.map(getFieldTypes(_))
    +
    +    val keySelectorType = new RowTypeInfo(keyTypes, keys)
    +
    +    val processFunction = new QueryableStateProcessFunction(
    +      namePrefix,
    +      queryConfig,
    +      keys,
    +      getFieldNames,
    +      getFieldTypes)
    +
    +    dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
    +      .process(processFunction)
    --- End diff --
    
    This `process(processFunction)` method has been deprecated. Replace it with `process(KeyedProcessFunction)`.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r174339658
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.stream.table
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.common.time.Time
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.harness.HarnessTestBase
    +import org.apache.flink.table.sinks.{QueryableStateProcessFunction, RowKeySelector}
    +import org.apache.flink.types.Row
    +import org.junit.Assert.assertEquals
    +import org.junit.Test
    +
    +class QueryableTableSinkTest extends HarnessTestBase {
    +  @Test
    +  def testRowSelector(): Unit = {
    +    val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), TypeInformation.of(classOf[String]))
    +    val selector = new RowKeySelector(Array(0, 2), new RowTypeInfo(keyTypes:_*))
    +
    +    val src = Row.of(List(1), "a", "b")
    +    val key = selector.getKey(JTuple2.of(true, src))
    +
    +    assertEquals(Row.of(List(1), "b"), key)
    +  }
    +
    +  @Test
    +  def testProcessFunction(): Unit = {
    +    val queryConfig = new StreamQueryConfig()
    +      .withIdleStateRetentionTime(Time.milliseconds(2), Time.milliseconds(10))
    +
    +    val keys = Array("id")
    +    val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
    +    val fieldNames = Array("id", "is_manager", "name")
    +    val fieldTypes: Array[TypeInformation[_]] = Array(
    +      TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
    +      TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
    +      TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
    +    val func = new QueryableStateProcessFunction("test", queryConfig, keys, fieldNames, fieldTypes)
    +
    +    val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, Row], Void](func)
    --- End diff --
    
    Try to avoid using deprecated classes/methods.


---

[GitHub] flink issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink.

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/flink/pull/5688
  
    @twalthr  I store each column separately to avoid encoding and schema management problems, also to reduce unnecessary data transfer when querying data. Issue multiple requests for multiple fields is one of the drawbacks.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r190140977
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +/**
    +  * A QueryableTableSink stores table in queryable state.
    +  *
    +  * This class stores table in queryable state so that users can access table data without
    +  * dependency on external storage.
    +  *
    +  * Since this is only a kv storage, currently user can only do point query against it.
    +  *
    +  * Example:
    +  * {{{
    +  *   val env = ExecutionEnvironment.getExecutionEnvironment
    +  *   val tEnv = TableEnvironment.getTableEnvironment(env)
    +  *
    +  *   val table: Table  = ...
    +  *
    +  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
    +  *       "prefix",
    +  *       queryConfig,
    +  *       None)
    +  *
    +  *   tEnv.writeToSink(table, queryableTableSink, config)
    +  * }}}
    +  *
    +  * When program starts to run, user can access state with QueryableStateClient.
    +  * {{{
    +  *   val client = new QueryableStateClient(tmHostname, proxyPort)
    +  *   val data = client.getKvState(
    +  *       jobId,
    +  *       "prefix-column1",
    +  *       Row.of(1),
    +  *       new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id"))
    +  *       stateDescriptor)
    +  *     .get();
    +  *
    +  * }}}
    +  *
    +  *
    +  * @param namePrefix
    +  * @param queryConfig
    +  * @param cleanupTimeDomain
    +  */
    +class QueryableTableSink(
    +    private val namePrefix: String,
    +    private val queryConfig: StreamQueryConfig,
    +    private val cleanupTimeDomain: Option[TimeDomain])
    +  extends UpsertStreamTableSink[Row]
    +    with TableSinkBase[JTuple2[JBool, Row]] {
    +  private var keys: Array[String] = _
    +
    +  override def setKeyFields(keys: Array[String]): Unit = {
    +    if (keys == null) {
    +      throw new IllegalArgumentException("keys can't be null!")
    +    }
    +    this.keys = keys
    +  }
    +
    +  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
    +    if (isAppendOnly) {
    +      throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " +
    +        "tables as the table would grow infinitely")
    +    }
    +  }
    +
    +  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
    +
    +  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = {
    +    val keyIndices = keys.map(getFieldNames.indexOf(_))
    +    val keyTypes = keyIndices.map(getFieldTypes(_))
    +
    +    val keySelectorType = new RowTypeInfo(keyTypes, keys)
    +
    +    val processFunction = new QueryableStateProcessFunction(
    +      namePrefix,
    +      queryConfig,
    +      keys,
    +      getFieldNames,
    +      getFieldTypes,
    +      calculateCleanupTimeDomain(dataStream.getExecutionEnvironment.getStreamTimeCharacteristic))
    +
    +    dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
    +      .process(processFunction)
    +  }
    +
    +  private def calculateCleanupTimeDomain(timeCharacteristic: TimeCharacteristic): TimeDomain = {
    +    val timeDomainFromTimeCharacteristic = {
    +      timeCharacteristic match {
    +        case TimeCharacteristic.IngestionTime | TimeCharacteristic.ProcessingTime =>
    +          TimeDomain.PROCESSING_TIME
    +        case TimeCharacteristic.EventTime =>
    +          TimeDomain.EVENT_TIME
    +      }
    +    }
    +
    +    cleanupTimeDomain.getOrElse(timeDomainFromTimeCharacteristic)
    +  }
    +
    +  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
    +    new QueryableTableSink(this.namePrefix, this.queryConfig, this.cleanupTimeDomain)
    +  }
    +}
    +
    +class RowKeySelector(
    +  private val keyIndices: Array[Int],
    +  @transient private val returnType: TypeInformation[Row])
    +  extends KeySelector[JTuple2[JBool, Row], Row]
    +    with ResultTypeQueryable[Row] {
    +
    +  override def getKey(value: JTuple2[JBool, Row]): Row = {
    +    val keys = keyIndices
    +
    +    val srcRow = value.f1
    +
    +    val destRow = new Row(keys.length)
    +    var i = 0
    +    while (i < keys.length) {
    +      destRow.setField(i, srcRow.getField(keys(i)))
    +      i += 1
    +    }
    +
    +    destRow
    +  }
    +
    +  override def getProducedType: TypeInformation[Row] = returnType
    +}
    +
    +class QueryableStateProcessFunction(
    +    private val namePrefix: String,
    +    private val queryConfig: StreamQueryConfig,
    +    private val keyNames: Array[String],
    +    private val fieldNames: Array[String],
    +    private val fieldTypes: Array[TypeInformation[_]],
    +    private val cleanupTimeDomain: TimeDomain)
    +  extends KeyedProcessFunctionWithCleanupState[Row, JTuple2[JBool, Row], Void](queryConfig) {
    +
    +  @transient private var states = Array[ValueState[AnyRef]]()
    +  @transient private var nonKeyIndices = Array[Int]()
    +
    +  override def open(parameters: Configuration): Unit = {
    +    super.open(parameters)
    +
    +    nonKeyIndices = fieldNames.indices
    +      .filter(idx => !keyNames.contains(fieldNames(idx)))
    +      .toArray
    +
    +    val statesBuilder = Array.newBuilder[ValueState[AnyRef]]
    +
    +    for (i <- nonKeyIndices) {
    +      val stateDesc = new ValueStateDescriptor(fieldNames(i), fieldTypes(i))
    +      stateDesc.initializeSerializerUnlessSet(getRuntimeContext.getExecutionConfig)
    +      stateDesc.setQueryable(s"$namePrefix-${fieldNames(i)}")
    +      statesBuilder += getRuntimeContext.getState(stateDesc).asInstanceOf[ValueState[AnyRef]]
    +    }
    +
    +    states = statesBuilder.result()
    +
    +    initCleanupTimeState("QueryableStateCleanupTime")
    +  }
    +
    +  override def processElement(
    +    value: JTuple2[JBool, Row],
    +    ctx: KeyedProcessFunction[Row, JTuple2[JBool, Row], Void]#Context,
    +    out: Collector[Void]): Unit = {
    +    if (value.f0) {
    +      for (i <- nonKeyIndices.indices) {
    --- End diff --
    
    We don't use Scala magic in runtime functions for performance reasons. Use a basic while loop instead.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r190140171
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +/**
    +  * A QueryableTableSink stores table in queryable state.
    +  *
    +  * This class stores table in queryable state so that users can access table data without
    +  * dependency on external storage.
    +  *
    +  * Since this is only a kv storage, currently user can only do point query against it.
    +  *
    +  * Example:
    +  * {{{
    +  *   val env = ExecutionEnvironment.getExecutionEnvironment
    +  *   val tEnv = TableEnvironment.getTableEnvironment(env)
    +  *
    +  *   val table: Table  = ...
    +  *
    +  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
    +  *       "prefix",
    +  *       queryConfig,
    +  *       None)
    +  *
    +  *   tEnv.writeToSink(table, queryableTableSink, config)
    +  * }}}
    +  *
    +  * When program starts to run, user can access state with QueryableStateClient.
    +  * {{{
    +  *   val client = new QueryableStateClient(tmHostname, proxyPort)
    +  *   val data = client.getKvState(
    +  *       jobId,
    +  *       "prefix-column1",
    +  *       Row.of(1),
    +  *       new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id"))
    +  *       stateDescriptor)
    +  *     .get();
    +  *
    +  * }}}
    +  *
    +  *
    +  * @param namePrefix
    +  * @param queryConfig
    +  * @param cleanupTimeDomain
    +  */
    +class QueryableTableSink(
    +    private val namePrefix: String,
    +    private val queryConfig: StreamQueryConfig,
    +    private val cleanupTimeDomain: Option[TimeDomain])
    +  extends UpsertStreamTableSink[Row]
    +    with TableSinkBase[JTuple2[JBool, Row]] {
    +  private var keys: Array[String] = _
    +
    +  override def setKeyFields(keys: Array[String]): Unit = {
    +    if (keys == null) {
    +      throw new IllegalArgumentException("keys can't be null!")
    +    }
    +    this.keys = keys
    +  }
    +
    +  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
    +    if (isAppendOnly) {
    +      throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " +
    +        "tables as the table would grow infinitely")
    +    }
    +  }
    +
    +  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
    +
    +  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = {
    +    val keyIndices = keys.map(getFieldNames.indexOf(_))
    +    val keyTypes = keyIndices.map(getFieldTypes(_))
    +
    +    val keySelectorType = new RowTypeInfo(keyTypes, keys)
    +
    +    val processFunction = new QueryableStateProcessFunction(
    +      namePrefix,
    +      queryConfig,
    +      keys,
    +      getFieldNames,
    +      getFieldTypes,
    +      calculateCleanupTimeDomain(dataStream.getExecutionEnvironment.getStreamTimeCharacteristic))
    +
    +    dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
    +      .process(processFunction)
    +  }
    +
    +  private def calculateCleanupTimeDomain(timeCharacteristic: TimeCharacteristic): TimeDomain = {
    +    val timeDomainFromTimeCharacteristic = {
    +      timeCharacteristic match {
    +        case TimeCharacteristic.IngestionTime | TimeCharacteristic.ProcessingTime =>
    +          TimeDomain.PROCESSING_TIME
    +        case TimeCharacteristic.EventTime =>
    +          TimeDomain.EVENT_TIME
    +      }
    +    }
    +
    +    cleanupTimeDomain.getOrElse(timeDomainFromTimeCharacteristic)
    +  }
    +
    +  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
    +    new QueryableTableSink(this.namePrefix, this.queryConfig, this.cleanupTimeDomain)
    +  }
    +}
    +
    +class RowKeySelector(
    +  private val keyIndices: Array[Int],
    +  @transient private val returnType: TypeInformation[Row])
    +  extends KeySelector[JTuple2[JBool, Row], Row]
    +    with ResultTypeQueryable[Row] {
    +
    +  override def getKey(value: JTuple2[JBool, Row]): Row = {
    +    val keys = keyIndices
    +
    +    val srcRow = value.f1
    +
    +    val destRow = new Row(keys.length)
    +    var i = 0
    +    while (i < keys.length) {
    +      destRow.setField(i, srcRow.getField(keys(i)))
    +      i += 1
    +    }
    +
    +    destRow
    +  }
    +
    +  override def getProducedType: TypeInformation[Row] = returnType
    +}
    +
    +class QueryableStateProcessFunction(
    +    private val namePrefix: String,
    +    private val queryConfig: StreamQueryConfig,
    +    private val keyNames: Array[String],
    +    private val fieldNames: Array[String],
    +    private val fieldTypes: Array[TypeInformation[_]],
    +    private val cleanupTimeDomain: TimeDomain)
    +  extends KeyedProcessFunctionWithCleanupState[Row, JTuple2[JBool, Row], Void](queryConfig) {
    +
    +  @transient private var states = Array[ValueState[AnyRef]]()
    --- End diff --
    
    Use default initializer `_`.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r190136036
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala ---
    @@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: Stream
       protected def registerProcessingCleanupTimer(
         ctx: KeyedProcessFunction[K, I, O]#Context,
         currentTime: Long): Unit = {
    -    if (stateCleaningEnabled) {
    +    registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME)
    +  }
     
    +  protected def registerEventCleanupTimer(
    --- End diff --
    
    @fhueske is there a reason why we have no event-time clean-up state so far? I think it would make sense to move this change to a separate PR. We should make the notion of time configurable through `StreamQueryConfig`.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r174990348
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +class QueryableTableSink(
    +  private val namePrefix: String,
    --- End diff --
    
    Fixed.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r190136859
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +/**
    +  * A QueryableTableSink stores table in queryable state.
    +  *
    +  * This class stores table in queryable state so that users can access table data without
    +  * dependency on external storage.
    +  *
    +  * Since this is only a kv storage, currently user can only do point query against it.
    +  *
    +  * Example:
    +  * {{{
    +  *   val env = ExecutionEnvironment.getExecutionEnvironment
    +  *   val tEnv = TableEnvironment.getTableEnvironment(env)
    +  *
    +  *   val table: Table  = ...
    +  *
    +  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
    +  *       "prefix",
    +  *       queryConfig,
    +  *       None)
    +  *
    +  *   tEnv.writeToSink(table, queryableTableSink, config)
    +  * }}}
    +  *
    +  * When program starts to run, user can access state with QueryableStateClient.
    +  * {{{
    +  *   val client = new QueryableStateClient(tmHostname, proxyPort)
    +  *   val data = client.getKvState(
    +  *       jobId,
    +  *       "prefix-column1",
    +  *       Row.of(1),
    +  *       new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id"))
    +  *       stateDescriptor)
    +  *     .get();
    +  *
    +  * }}}
    +  *
    +  *
    +  * @param namePrefix
    +  * @param queryConfig
    +  * @param cleanupTimeDomain
    +  */
    +class QueryableTableSink(
    +    private val namePrefix: String,
    +    private val queryConfig: StreamQueryConfig,
    +    private val cleanupTimeDomain: Option[TimeDomain])
    +  extends UpsertStreamTableSink[Row]
    +    with TableSinkBase[JTuple2[JBool, Row]] {
    +  private var keys: Array[String] = _
    +
    +  override def setKeyFields(keys: Array[String]): Unit = {
    +    if (keys == null) {
    +      throw new IllegalArgumentException("keys can't be null!")
    +    }
    +    this.keys = keys
    +  }
    +
    +  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
    +    if (isAppendOnly) {
    +      throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " +
    +        "tables as the table would grow infinitely")
    +    }
    +  }
    +
    +  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
    +
    +  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = {
    +    val keyIndices = keys.map(getFieldNames.indexOf(_))
    +    val keyTypes = keyIndices.map(getFieldTypes(_))
    +
    +    val keySelectorType = new RowTypeInfo(keyTypes, keys)
    +
    +    val processFunction = new QueryableStateProcessFunction(
    +      namePrefix,
    +      queryConfig,
    +      keys,
    +      getFieldNames,
    +      getFieldTypes,
    +      calculateCleanupTimeDomain(dataStream.getExecutionEnvironment.getStreamTimeCharacteristic))
    --- End diff --
    
    We should make this configurable through the constructor parameters instead and I think it makes sense to allow specifying a retention time as well. Use the `StreamQueryConfig` values as default.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r175680339
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +class QueryableTableSink(
    --- End diff --
    
    Java doc added.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r174338968
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,175 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +class QueryableTableSink(private val namePrefix: String,
    +                         private val queryConfig: StreamQueryConfig)
    +  extends UpsertStreamTableSink[Row]
    +  with TableSinkBase[JTuple2[JBool, Row]] {
    +  private var keys: Array[String] = _
    +
    +  /**
    +    * Configures the unique key fields of the [[Table]] to write.
    --- End diff --
    
    Use the full class name for `[[Table]]` since it's not imported.


---

[GitHub] flink issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink.

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/flink/pull/5688
  
    @twalthr Please help to review this.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r174366980
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala ---
    @@ -0,0 +1,102 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.stream.table
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.api.common.time.Time
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.harness.HarnessTestBase
    +import org.apache.flink.table.sinks.{QueryableStateProcessFunction, RowKeySelector}
    +import org.apache.flink.types.Row
    +import org.junit.Assert.assertEquals
    +import org.junit.Test
    +
    +class QueryableTableSinkTest extends HarnessTestBase {
    +  @Test
    +  def testRowSelector(): Unit = {
    +    val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), TypeInformation.of(classOf[String]))
    +    val selector = new RowKeySelector(Array(0, 2), new RowTypeInfo(keyTypes:_*))
    +
    +    val src = Row.of(List(1), "a", "b")
    +    val key = selector.getKey(JTuple2.of(true, src))
    +
    +    assertEquals(Row.of(List(1), "b"), key)
    +  }
    +
    +  @Test
    +  def testProcessFunction(): Unit = {
    +    val queryConfig = new StreamQueryConfig()
    +      .withIdleStateRetentionTime(Time.milliseconds(2), Time.milliseconds(10))
    +
    +    val keys = Array("id")
    +    val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
    +    val fieldNames = Array("id", "is_manager", "name")
    +    val fieldTypes: Array[TypeInformation[_]] = Array(
    +      TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
    +      TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
    +      TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
    +    val func = new QueryableStateProcessFunction("test", queryConfig, keys, fieldNames, fieldTypes)
    +
    +    val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, Row], Void](func)
    --- End diff --
    
    Same as above.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r174338981
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,175 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +class QueryableTableSink(private val namePrefix: String,
    +                         private val queryConfig: StreamQueryConfig)
    --- End diff --
    
    Format the code like that.
    ```
    class QueryableTableSink(
        private val namePrefix: String,
        private val queryConfig: StreamQueryConfig)
      extends UpsertStreamTableSink[Row]
    ...
    ```



---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r190137151
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +/**
    +  * A QueryableTableSink stores table in queryable state.
    +  *
    +  * This class stores table in queryable state so that users can access table data without
    +  * dependency on external storage.
    +  *
    +  * Since this is only a kv storage, currently user can only do point query against it.
    +  *
    +  * Example:
    +  * {{{
    +  *   val env = ExecutionEnvironment.getExecutionEnvironment
    +  *   val tEnv = TableEnvironment.getTableEnvironment(env)
    +  *
    +  *   val table: Table  = ...
    +  *
    +  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
    +  *       "prefix",
    +  *       queryConfig,
    +  *       None)
    +  *
    +  *   tEnv.writeToSink(table, queryableTableSink, config)
    +  * }}}
    +  *
    +  * When program starts to run, user can access state with QueryableStateClient.
    +  * {{{
    +  *   val client = new QueryableStateClient(tmHostname, proxyPort)
    +  *   val data = client.getKvState(
    +  *       jobId,
    +  *       "prefix-column1",
    +  *       Row.of(1),
    +  *       new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id"))
    +  *       stateDescriptor)
    +  *     .get();
    +  *
    +  * }}}
    +  *
    +  *
    +  * @param namePrefix
    +  * @param queryConfig
    +  * @param cleanupTimeDomain
    +  */
    +class QueryableTableSink(
    +    private val namePrefix: String,
    +    private val queryConfig: StreamQueryConfig,
    +    private val cleanupTimeDomain: Option[TimeDomain])
    +  extends UpsertStreamTableSink[Row]
    +    with TableSinkBase[JTuple2[JBool, Row]] {
    +  private var keys: Array[String] = _
    +
    +  override def setKeyFields(keys: Array[String]): Unit = {
    +    if (keys == null) {
    +      throw new IllegalArgumentException("keys can't be null!")
    +    }
    +    this.keys = keys
    +  }
    +
    +  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
    +    if (isAppendOnly) {
    +      throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " +
    +        "tables as the table would grow infinitely")
    +    }
    +  }
    +
    +  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
    +
    +  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = {
    +    val keyIndices = keys.map(getFieldNames.indexOf(_))
    +    val keyTypes = keyIndices.map(getFieldTypes(_))
    +
    +    val keySelectorType = new RowTypeInfo(keyTypes, keys)
    +
    +    val processFunction = new QueryableStateProcessFunction(
    +      namePrefix,
    +      queryConfig,
    +      keys,
    +      getFieldNames,
    +      getFieldTypes,
    +      calculateCleanupTimeDomain(dataStream.getExecutionEnvironment.getStreamTimeCharacteristic))
    +
    +    dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
    +      .process(processFunction)
    +  }
    +
    +  private def calculateCleanupTimeDomain(timeCharacteristic: TimeCharacteristic): TimeDomain = {
    +    val timeDomainFromTimeCharacteristic = {
    +      timeCharacteristic match {
    +        case TimeCharacteristic.IngestionTime | TimeCharacteristic.ProcessingTime =>
    +          TimeDomain.PROCESSING_TIME
    +        case TimeCharacteristic.EventTime =>
    +          TimeDomain.EVENT_TIME
    +      }
    +    }
    +
    +    cleanupTimeDomain.getOrElse(timeDomainFromTimeCharacteristic)
    +  }
    +
    +  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
    +    new QueryableTableSink(this.namePrefix, this.queryConfig, this.cleanupTimeDomain)
    +  }
    +}
    +
    +class RowKeySelector(
    +  private val keyIndices: Array[Int],
    +  @transient private val returnType: TypeInformation[Row])
    +  extends KeySelector[JTuple2[JBool, Row], Row]
    +    with ResultTypeQueryable[Row] {
    +
    +  override def getKey(value: JTuple2[JBool, Row]): Row = {
    +    val keys = keyIndices
    --- End diff --
    
    Use `Row.project()`. Maybe move to separate class?


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r174511059
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +class QueryableTableSink(
    +  private val namePrefix: String,
    --- End diff --
    
    Four spaces indent for parameter declaring (which is also applicable for methods).


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r190176842
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala ---
    @@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: Stream
       protected def registerProcessingCleanupTimer(
         ctx: KeyedProcessFunction[K, I, O]#Context,
         currentTime: Long): Unit = {
    -    if (stateCleaningEnabled) {
    +    registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME)
    +  }
     
    +  protected def registerEventCleanupTimer(
    --- End diff --
    
    We implemented state cleanup as processing time because it is easier to reason about for users and doesn't interfere that much with event-time processing (it is not possible to distinguish timers yet). However, it also has a few short comings such as cleared state when recovering a query from a savepoint (which we don't really encourage at the moment). 
    
    Anyway, introducing event-time state cleanup should definitely go into a separate issue and PR.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r175317673
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,162 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +class QueryableTableSink(
    +    private val namePrefix: String,
    +    private val queryConfig: StreamQueryConfig)
    +  extends UpsertStreamTableSink[Row]
    +    with TableSinkBase[JTuple2[JBool, Row]] {
    +  private var keys: Array[String] = _
    +
    +  override def setKeyFields(keys: Array[String]): Unit = {
    +    if (keys == null) {
    +      throw new IllegalArgumentException("keys can't be null!")
    +    }
    +    this.keys = keys
    +  }
    +
    +  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
    +    if (isAppendOnly) {
    +      throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " +
    +        "tables as the table would grow infinitely")
    +    }
    +  }
    +
    +  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
    +
    +  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = {
    +    val keyIndices = keys.map(getFieldNames.indexOf(_))
    +    val keyTypes = keyIndices.map(getFieldTypes(_))
    +
    +    val keySelectorType = new RowTypeInfo(keyTypes, keys)
    +
    +    val processFunction = new QueryableStateProcessFunction(
    +      namePrefix,
    +      queryConfig,
    +      keys,
    +      getFieldNames,
    +      getFieldTypes)
    +
    +    dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
    +      .process(processFunction)
    +  }
    +
    +  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
    +    new QueryableTableSink(this.namePrefix, this.queryConfig)
    +  }
    +}
    +
    +class RowKeySelector(
    +  private val keyIndices: Array[Int],
    +  @transient private val returnType: TypeInformation[Row])
    +  extends KeySelector[JTuple2[JBool, Row], Row]
    +    with ResultTypeQueryable[Row] {
    +
    +  override def getKey(value: JTuple2[JBool, Row]): Row = {
    +    val keys = keyIndices
    +
    +    val srcRow = value.f1
    +
    +    val destRow = new Row(keys.length)
    +    var i = 0
    +    while (i < keys.length) {
    +      destRow.setField(i, srcRow.getField(keys(i)))
    +      i += 1
    +    }
    +
    +    destRow
    +  }
    +
    +  override def getProducedType: TypeInformation[Row] = returnType
    +}
    +
    +class QueryableStateProcessFunction(
    +  private val namePrefix: String,
    +  private val queryConfig: StreamQueryConfig,
    +  private val keyNames: Array[String],
    +  private val fieldNames: Array[String],
    +  private val fieldTypes: Array[TypeInformation[_]])
    +  extends ProcessFunctionWithCleanupState[JTuple2[JBool, Row], Void](queryConfig) {
    +
    +  @transient private var states = Array[ValueState[AnyRef]]()
    +  @transient private var nonKeyIndices = Array[Int]()
    +
    +  override def open(parameters: Configuration): Unit = {
    +    super.open(parameters)
    +
    +    nonKeyIndices = fieldNames.indices
    +      .filter(idx => !keyNames.contains(fieldNames(idx)))
    +      .toArray
    +
    +    val statesBuilder = Array.newBuilder[ValueState[AnyRef]]
    +
    +    for (i <- nonKeyIndices) {
    +      val stateDesc = new ValueStateDescriptor(fieldNames(i), fieldTypes(i))
    +      stateDesc.initializeSerializerUnlessSet(getRuntimeContext.getExecutionConfig)
    +      stateDesc.setQueryable(fieldNames(i))
    +      statesBuilder += getRuntimeContext.getState(stateDesc).asInstanceOf[ValueState[AnyRef]]
    +    }
    +
    +    states = statesBuilder.result()
    +
    +    initCleanupTimeState("QueryableStateCleanupTime")
    +  }
    +
    +  override def processElement(
    +    value: JTuple2[JBool, Row],
    +    ctx: ProcessFunction[JTuple2[JBool, Row], Void]#Context,
    +    out: Collector[Void]): Unit = {
    +    if (value.f0) {
    +      for (i <- nonKeyIndices.indices) {
    +        states(i).update(value.f1.getField(nonKeyIndices(i)))
    +      }
    +
    +      val currentTime = ctx.timerService().currentProcessingTime()
    --- End diff --
    
    Seems like this is registered to work with processing time domain only. I was wondering would it be easier to do ITCase if the table sink also works with event time, or having another piece of event time queryable table sink ?


---

[GitHub] flink issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink.

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/flink/pull/5688
  
    @xccui Thanks for you suggestions.
    
    1. Add an it test for this is difficult. Think about it, I need to ensure that elements to be processed while keeping the job running, and it's difficult to achieve this. But I will do some manual test for that.
    2. I'll add doc for that.
    3. I'll squash the commits when review is done.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r190135653
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +/**
    +  * A QueryableTableSink stores table in queryable state.
    --- End diff --
    
    Update the comment.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r190140777
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +/**
    +  * A QueryableTableSink stores table in queryable state.
    +  *
    +  * This class stores table in queryable state so that users can access table data without
    +  * dependency on external storage.
    +  *
    +  * Since this is only a kv storage, currently user can only do point query against it.
    +  *
    +  * Example:
    +  * {{{
    +  *   val env = ExecutionEnvironment.getExecutionEnvironment
    +  *   val tEnv = TableEnvironment.getTableEnvironment(env)
    +  *
    +  *   val table: Table  = ...
    +  *
    +  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
    +  *       "prefix",
    +  *       queryConfig,
    +  *       None)
    +  *
    +  *   tEnv.writeToSink(table, queryableTableSink, config)
    +  * }}}
    +  *
    +  * When program starts to run, user can access state with QueryableStateClient.
    +  * {{{
    +  *   val client = new QueryableStateClient(tmHostname, proxyPort)
    +  *   val data = client.getKvState(
    +  *       jobId,
    +  *       "prefix-column1",
    +  *       Row.of(1),
    +  *       new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id"))
    +  *       stateDescriptor)
    +  *     .get();
    +  *
    +  * }}}
    +  *
    +  *
    +  * @param namePrefix
    +  * @param queryConfig
    +  * @param cleanupTimeDomain
    +  */
    +class QueryableTableSink(
    +    private val namePrefix: String,
    +    private val queryConfig: StreamQueryConfig,
    +    private val cleanupTimeDomain: Option[TimeDomain])
    +  extends UpsertStreamTableSink[Row]
    +    with TableSinkBase[JTuple2[JBool, Row]] {
    +  private var keys: Array[String] = _
    +
    +  override def setKeyFields(keys: Array[String]): Unit = {
    +    if (keys == null) {
    +      throw new IllegalArgumentException("keys can't be null!")
    +    }
    +    this.keys = keys
    +  }
    +
    +  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
    +    if (isAppendOnly) {
    +      throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " +
    +        "tables as the table would grow infinitely")
    +    }
    +  }
    +
    +  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
    +
    +  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = {
    +    val keyIndices = keys.map(getFieldNames.indexOf(_))
    +    val keyTypes = keyIndices.map(getFieldTypes(_))
    +
    +    val keySelectorType = new RowTypeInfo(keyTypes, keys)
    +
    +    val processFunction = new QueryableStateProcessFunction(
    +      namePrefix,
    +      queryConfig,
    +      keys,
    +      getFieldNames,
    +      getFieldTypes,
    +      calculateCleanupTimeDomain(dataStream.getExecutionEnvironment.getStreamTimeCharacteristic))
    +
    +    dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
    +      .process(processFunction)
    +  }
    +
    +  private def calculateCleanupTimeDomain(timeCharacteristic: TimeCharacteristic): TimeDomain = {
    +    val timeDomainFromTimeCharacteristic = {
    +      timeCharacteristic match {
    +        case TimeCharacteristic.IngestionTime | TimeCharacteristic.ProcessingTime =>
    +          TimeDomain.PROCESSING_TIME
    +        case TimeCharacteristic.EventTime =>
    +          TimeDomain.EVENT_TIME
    +      }
    +    }
    +
    +    cleanupTimeDomain.getOrElse(timeDomainFromTimeCharacteristic)
    +  }
    +
    +  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
    +    new QueryableTableSink(this.namePrefix, this.queryConfig, this.cleanupTimeDomain)
    +  }
    +}
    +
    +class RowKeySelector(
    +  private val keyIndices: Array[Int],
    +  @transient private val returnType: TypeInformation[Row])
    +  extends KeySelector[JTuple2[JBool, Row], Row]
    +    with ResultTypeQueryable[Row] {
    +
    +  override def getKey(value: JTuple2[JBool, Row]): Row = {
    +    val keys = keyIndices
    +
    +    val srcRow = value.f1
    +
    +    val destRow = new Row(keys.length)
    +    var i = 0
    +    while (i < keys.length) {
    +      destRow.setField(i, srcRow.getField(keys(i)))
    +      i += 1
    +    }
    +
    +    destRow
    +  }
    +
    +  override def getProducedType: TypeInformation[Row] = returnType
    +}
    +
    +class QueryableStateProcessFunction(
    +    private val namePrefix: String,
    +    private val queryConfig: StreamQueryConfig,
    +    private val keyNames: Array[String],
    +    private val fieldNames: Array[String],
    +    private val fieldTypes: Array[TypeInformation[_]],
    +    private val cleanupTimeDomain: TimeDomain)
    +  extends KeyedProcessFunctionWithCleanupState[Row, JTuple2[JBool, Row], Void](queryConfig) {
    +
    +  @transient private var states = Array[ValueState[AnyRef]]()
    +  @transient private var nonKeyIndices = Array[Int]()
    +
    +  override def open(parameters: Configuration): Unit = {
    +    super.open(parameters)
    +
    +    nonKeyIndices = fieldNames.indices
    +      .filter(idx => !keyNames.contains(fieldNames(idx)))
    +      .toArray
    +
    +    val statesBuilder = Array.newBuilder[ValueState[AnyRef]]
    +
    +    for (i <- nonKeyIndices) {
    +      val stateDesc = new ValueStateDescriptor(fieldNames(i), fieldTypes(i))
    +      stateDesc.initializeSerializerUnlessSet(getRuntimeContext.getExecutionConfig)
    +      stateDesc.setQueryable(s"$namePrefix-${fieldNames(i)}")
    +      statesBuilder += getRuntimeContext.getState(stateDesc).asInstanceOf[ValueState[AnyRef]]
    +    }
    +
    +    states = statesBuilder.result()
    +
    +    initCleanupTimeState("QueryableStateCleanupTime")
    +  }
    +
    +  override def processElement(
    +    value: JTuple2[JBool, Row],
    +    ctx: KeyedProcessFunction[Row, JTuple2[JBool, Row], Void]#Context,
    +    out: Collector[Void]): Unit = {
    +    if (value.f0) {
    +      for (i <- nonKeyIndices.indices) {
    +        states(i).update(value.f1.getField(nonKeyIndices(i)))
    +      }
    +
    +      val currentTime: Long = cleanupTimeDomain match{
    +        case TimeDomain.EVENT_TIME => ctx.timestamp()
    --- End diff --
    
    By default this timestamp is empty. It is filled by a preceding operator that is generated by the Table API.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r174338955
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,175 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +class QueryableTableSink(private val namePrefix: String,
    +                         private val queryConfig: StreamQueryConfig)
    +  extends UpsertStreamTableSink[Row]
    +  with TableSinkBase[JTuple2[JBool, Row]] {
    +  private var keys: Array[String] = _
    +
    +  /**
    +    * Configures the unique key fields of the [[Table]] to write.
    +    * The method is called after [[TableSink.configure()]].
    +    *
    +    * The keys array might be empty, if the table consists of a single (updated) record.
    +    * If the table does not have a key and is append-only, the keys attribute is null.
    +    *
    +    * @param keys the field names of the table's keys, an empty array if the table has a single
    +    *             row, and null if the table is append-only and has no key.
    +    */
    +  override def setKeyFields(keys: Array[String]): Unit = {
    +    if (keys == null) {
    +      throw new IllegalArgumentException("keys can't be null!")
    +    }
    +    this.keys = keys
    +  }
    +
    +  /**
    +    * Specifies whether the [[Table]] to write is append-only or not.
    +    *
    +    * @param isAppendOnly true if the table is append-only, false otherwise.
    +    */
    +  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
    +    if (isAppendOnly) {
    +      throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only tables " +
    --- End diff --
    
    This line is too long (should be less than 100 characters).


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r190137656
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.KeyedProcessFunction
    +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +/**
    +  * A QueryableTableSink stores table in queryable state.
    +  *
    +  * This class stores table in queryable state so that users can access table data without
    +  * dependency on external storage.
    +  *
    +  * Since this is only a kv storage, currently user can only do point query against it.
    +  *
    +  * Example:
    +  * {{{
    +  *   val env = ExecutionEnvironment.getExecutionEnvironment
    +  *   val tEnv = TableEnvironment.getTableEnvironment(env)
    +  *
    +  *   val table: Table  = ...
    +  *
    +  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
    +  *       "prefix",
    +  *       queryConfig,
    +  *       None)
    +  *
    +  *   tEnv.writeToSink(table, queryableTableSink, config)
    +  * }}}
    +  *
    +  * When program starts to run, user can access state with QueryableStateClient.
    +  * {{{
    +  *   val client = new QueryableStateClient(tmHostname, proxyPort)
    +  *   val data = client.getKvState(
    +  *       jobId,
    +  *       "prefix-column1",
    +  *       Row.of(1),
    +  *       new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id"))
    +  *       stateDescriptor)
    +  *     .get();
    +  *
    +  * }}}
    +  *
    +  *
    +  * @param namePrefix
    +  * @param queryConfig
    +  * @param cleanupTimeDomain
    +  */
    +class QueryableTableSink(
    +    private val namePrefix: String,
    +    private val queryConfig: StreamQueryConfig,
    +    private val cleanupTimeDomain: Option[TimeDomain])
    +  extends UpsertStreamTableSink[Row]
    +    with TableSinkBase[JTuple2[JBool, Row]] {
    +  private var keys: Array[String] = _
    +
    +  override def setKeyFields(keys: Array[String]): Unit = {
    +    if (keys == null) {
    +      throw new IllegalArgumentException("keys can't be null!")
    +    }
    +    this.keys = keys
    +  }
    +
    +  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
    +    if (isAppendOnly) {
    +      throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " +
    +        "tables as the table would grow infinitely")
    +    }
    +  }
    +
    +  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
    +
    +  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = {
    +    val keyIndices = keys.map(getFieldNames.indexOf(_))
    +    val keyTypes = keyIndices.map(getFieldTypes(_))
    +
    +    val keySelectorType = new RowTypeInfo(keyTypes, keys)
    +
    +    val processFunction = new QueryableStateProcessFunction(
    +      namePrefix,
    +      queryConfig,
    +      keys,
    +      getFieldNames,
    +      getFieldTypes,
    +      calculateCleanupTimeDomain(dataStream.getExecutionEnvironment.getStreamTimeCharacteristic))
    +
    +    dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
    +      .process(processFunction)
    +  }
    +
    +  private def calculateCleanupTimeDomain(timeCharacteristic: TimeCharacteristic): TimeDomain = {
    +    val timeDomainFromTimeCharacteristic = {
    +      timeCharacteristic match {
    +        case TimeCharacteristic.IngestionTime | TimeCharacteristic.ProcessingTime =>
    +          TimeDomain.PROCESSING_TIME
    +        case TimeCharacteristic.EventTime =>
    +          TimeDomain.EVENT_TIME
    +      }
    +    }
    +
    +    cleanupTimeDomain.getOrElse(timeDomainFromTimeCharacteristic)
    +  }
    +
    +  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
    +    new QueryableTableSink(this.namePrefix, this.queryConfig, this.cleanupTimeDomain)
    +  }
    +}
    +
    +class RowKeySelector(
    +  private val keyIndices: Array[Int],
    +  @transient private val returnType: TypeInformation[Row])
    +  extends KeySelector[JTuple2[JBool, Row], Row]
    +    with ResultTypeQueryable[Row] {
    +
    +  override def getKey(value: JTuple2[JBool, Row]): Row = {
    +    val keys = keyIndices
    +
    +    val srcRow = value.f1
    +
    +    val destRow = new Row(keys.length)
    +    var i = 0
    +    while (i < keys.length) {
    +      destRow.setField(i, srcRow.getField(keys(i)))
    +      i += 1
    +    }
    +
    +    destRow
    +  }
    +
    +  override def getProducedType: TypeInformation[Row] = returnType
    +}
    +
    +class QueryableStateProcessFunction(
    --- End diff --
    
    Separate class?


---

[GitHub] flink issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink.

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on the issue:

    https://github.com/apache/flink/pull/5688
  
    @suez1224 Conflict resolved.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by xccui <gi...@git.apache.org>.
Github user xccui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r174339296
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala ---
    @@ -0,0 +1,175 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.sinks
    +
    +import java.lang.{Boolean => JBool}
    +
    +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.functions.KeySelector
    +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.streaming.api.datastream.DataStream
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.table.api.StreamQueryConfig
    +import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
    +
    +class QueryableTableSink(private val namePrefix: String,
    +                         private val queryConfig: StreamQueryConfig)
    +  extends UpsertStreamTableSink[Row]
    +  with TableSinkBase[JTuple2[JBool, Row]] {
    +  private var keys: Array[String] = _
    +
    +  /**
    +    * Configures the unique key fields of the [[Table]] to write.
    +    * The method is called after [[TableSink.configure()]].
    +    *
    +    * The keys array might be empty, if the table consists of a single (updated) record.
    +    * If the table does not have a key and is append-only, the keys attribute is null.
    +    *
    +    * @param keys the field names of the table's keys, an empty array if the table has a single
    +    *             row, and null if the table is append-only and has no key.
    +    */
    +  override def setKeyFields(keys: Array[String]): Unit = {
    +    if (keys == null) {
    +      throw new IllegalArgumentException("keys can't be null!")
    +    }
    +    this.keys = keys
    +  }
    +
    +  /**
    +    * Specifies whether the [[Table]] to write is append-only or not.
    +    *
    +    * @param isAppendOnly true if the table is append-only, false otherwise.
    +    */
    +  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
    +    if (isAppendOnly) {
    +      throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only tables " +
    +        "as the table would grow infinitely")
    +    }
    +  }
    +
    +  /** Returns the requested record type */
    +  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
    +
    +  /** Emits the DataStream. */
    +  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = {
    +    val keyIndices = keys.map(getFieldNames.indexOf(_))
    +    val keyTypes = keyIndices.map(getFieldTypes(_))
    +
    +    val keySelectorType = new RowTypeInfo(keyTypes, keys)
    +
    +    val processFunction = new QueryableStateProcessFunction(
    +      namePrefix,
    +      queryConfig,
    +      keys,
    +      getFieldNames,
    +      getFieldTypes)
    +
    +    dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
    +      .process(processFunction)
    +  }
    +
    +  /** Return a deep copy of the [[TableSink]]. */
    +  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
    --- End diff --
    
    The docs for overridden methods could be omitted.


---

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

Posted by liurenjie1024 <gi...@git.apache.org>.
Github user liurenjie1024 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5688#discussion_r190859490
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala ---
    @@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: Stream
       protected def registerProcessingCleanupTimer(
         ctx: KeyedProcessFunction[K, I, O]#Context,
         currentTime: Long): Unit = {
    -    if (stateCleaningEnabled) {
    +    registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME)
    +  }
     
    +  protected def registerEventCleanupTimer(
    --- End diff --
    
    The reason why I put it in the same PR is that I don't want it to block this PR, but I also agree that we should move it to a separate one.


---