You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by suez1224 <gi...@git.apache.org> on 2017/08/25 06:37:49 UTC

[GitHub] flink pull request #4585: [FLINK-7491] add MultiSetTypeInfo; add built-in Co...

GitHub user suez1224 opened a pull request:

    https://github.com/apache/flink/pull/4585

    [FLINK-7491] add MultiSetTypeInfo; add built-in Collect Aggregate Function for Flink SQL.

    *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
    
    *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
    
    ## Contribution Checklist
    
      - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
      
      - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
      Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
    
      - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
      
      - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices).
    
      - Each pull request should address only one issue, not mix up code from multiple issues.
      
      - Each commit in the pull request has a meaningful commit message (including the JIRA id)
    
      - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
    
    
    **(The sections below can be removed for hotfixes of typos)**
    
    ## What is the purpose of the change
    
    *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)*
    
    
    ## Brief change log
    
    *(for example:)*
      - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact*
      - *Deployments RPC transmits only the blob storage reference*
      - *TaskManagers retrieve the TaskInfo from the blob cache*
    
    
    ## Verifying this change
    
    *(Please pick either of the following options)*
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    *(or)*
    
    This change is already covered by existing tests, such as *(please describe tests)*.
    
    *(or)*
    
    This change added tests and can be verified as follows:
    
    *(example:)*
      - *Added integration tests for end-to-end deployment with large payloads (100MB)*
      - *Extended integration test for recovery after master (JobManager) failure*
      - *Added test that validates that TaskInfo is transferred only once across recoveries*
      - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.*
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
      - The serializers: (yes / no / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / no)
      - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/suez1224/flink collect-multiset

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4585.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4585
    
----
commit 6ff726a8847cf95d15ab9d9ec6c9288e28792bc7
Author: Shuyi Chen <sh...@uber.com>
Date:   2017-08-24T00:54:10Z

    add MultiSetTypeInfo; add Collect SQL feature

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r140022994
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
    +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +import scala.collection.JavaConverters._
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
    +
    +abstract class CollectAggFunction[E]
    +  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
    +
    +  @transient
    +  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] {
    +    override def apply(t: Integer, u: Integer): Integer = t + u
    +  }
    +
    +  override def createAccumulator(): CollectAccumulator[E] = {
    +    val acc = new CollectAccumulator[E]()
    +    acc.f0 = new util.HashMap[E, Integer]()
    +    acc
    +  }
    +
    +  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
    +    if (value != null) {
    +      if (accumulator.f0.containsKey(value)) {
    +        val add = (x: Integer, y: Integer) => x + y
    --- End diff --
    
    `add` is not used, right?


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141112340
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1414,8 +1414,29 @@ object AggregateUtil {
               aggregates(index) = udagg.getFunction
               accTypes(index) = udagg.accType
     
    -        case unSupported: SqlAggFunction =>
    -          throw new TableException(s"unsupported Function: '${unSupported.getName}'")
    +        case other: SqlAggFunction =>
    +          if (other.getKind == SqlKind.COLLECT) {
    +            aggregates(index) = sqlTypeName match {
    +              case TINYINT =>
    +                new ByteCollectAggFunction
    +              case SMALLINT =>
    +                new ShortCollectAggFunction
    +              case INTEGER =>
    +                new IntCollectAggFunction
    +              case BIGINT =>
    +                new LongCollectAggFunction
    +              case VARCHAR | CHAR =>
    +                new StringCollectAggFunction
    +              case FLOAT =>
    +                new FloatCollectAggFunction
    +              case DOUBLE =>
    +                new DoubleCollectAggFunction
    +              case _ =>
    +                new ObjectCollectAggFunction
    +            }
    +          } else {
    --- End diff --
    
    else case can be removed because we keep the catch all.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141106084
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils._
    +import org.apache.flink.table.api.dataview.MapView
    +import org.apache.flink.table.dataview.MapViewTypeInfo
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E](var f0:MapView[E, Integer]) {
    +  def this() {
    +    this(null)
    +  }
    +
    +  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
    +
    +  override def equals(that: Any): Boolean =
    +    that match {
    +      case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 == that.f0
    +      case _ => false
    +    }
    +}
    +
    +abstract class CollectAggFunction[E]
    +  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
    +
    +  override def createAccumulator(): CollectAccumulator[E] = {
    +    val acc = new CollectAccumulator[E](new MapView[E, Integer](
    +      getValueTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO))
    +    acc
    +  }
    +
    +  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
    +    if (value != null) {
    +      if (accumulator.f0.contains(value)) {
    +        accumulator.f0.put(value, accumulator.f0.get(value) + 1)
    +      } else {
    +        accumulator.f0.put(value, 1)
    +      }
    +    }
    +  }
    +
    +  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, Integer] = {
    +    val iterator = accumulator.f0.iterator
    +    if (iterator.hasNext) {
    +      val map = new util.HashMap[E, Integer]()
    +      while (iterator.hasNext) {
    +        val entry = iterator.next()
    +        map.put(entry.getKey, entry.getValue)
    +      }
    +      map
    +    } else {
    +      null.asInstanceOf[util.Map[E, Integer]]
    --- End diff --
    
    According to the specs of `COLLECT`, is null the correct return value or an empty Multiset?


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141517953
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    --- End diff --
    
    Great! That makes things a lot easier :-)


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141102418
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils._
    +import org.apache.flink.table.api.dataview.MapView
    +import org.apache.flink.table.dataview.MapViewTypeInfo
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E](var f0:MapView[E, Integer]) {
    +  def this() {
    +    this(null)
    +  }
    +
    +  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
    +
    +  override def equals(that: Any): Boolean =
    +    that match {
    +      case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 == that.f0
    +      case _ => false
    +    }
    +}
    +
    +abstract class CollectAggFunction[E]
    +  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
    +
    +  override def createAccumulator(): CollectAccumulator[E] = {
    +    val acc = new CollectAccumulator[E](new MapView[E, Integer](
    +      getValueTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO))
    +    acc
    +  }
    +
    +  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
    +    if (value != null) {
    --- End diff --
    
    I'm not familiar with the specs of the `Collect` function. Should it also support `null` values?


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141096297
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +
    +	public MultisetTypeInfo(Class<T> elementTypeClass) {
    +		super(elementTypeClass, Integer.class);
    +	}
    +
    +	public MultisetTypeInfo(TypeInformation<T> elementTypeInfo) {
    +		super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  MultisetTypeInfo specific properties
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the type information for the elements contained in the Multiset
    +	 */
    +	public TypeInformation<T> getElementTypeInfo() {
    +		return getKeyTypeInfo();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  TypeInformation implementation
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public boolean isBasicType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isTupleType() {
    --- End diff --
    
    implemented by `MapTypeInfo`, no need to override.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r142667939
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1410,6 +1410,26 @@ object AggregateUtil {
             case _: SqlCountAggFunction =>
               aggregates(index) = new CountAggFunction
     
    +        case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT =>
    +          aggregates(index) = sqlTypeName match {
    +            case TINYINT =>
    +              new ByteCollectAggFunction
    +            case SMALLINT =>
    +              new ShortCollectAggFunction
    +            case INTEGER =>
    +              new IntCollectAggFunction
    +            case BIGINT =>
    +              new LongCollectAggFunction
    +            case VARCHAR | CHAR =>
    +              new StringCollectAggFunction
    +            case FLOAT =>
    +              new FloatCollectAggFunction
    +            case DOUBLE =>
    +              new DoubleCollectAggFunction
    +            case _ =>
    +              new ObjectCollectAggFunction
    +          }
    +
    --- End diff --
    
    we need to set `accTypes(index) = aggregates(index).getAccumulatorType` in order to activate the `MapView` feature.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141104254
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils._
    +import org.apache.flink.table.api.dataview.MapView
    +import org.apache.flink.table.dataview.MapViewTypeInfo
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E](var f0:MapView[E, Integer]) {
    +  def this() {
    +    this(null)
    +  }
    +
    +  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
    +
    +  override def equals(that: Any): Boolean =
    +    that match {
    +      case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 == that.f0
    +      case _ => false
    +    }
    +}
    +
    +abstract class CollectAggFunction[E]
    +  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
    +
    +  override def createAccumulator(): CollectAccumulator[E] = {
    +    val acc = new CollectAccumulator[E](new MapView[E, Integer](
    +      getValueTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO))
    +    acc
    +  }
    +
    +  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
    +    if (value != null) {
    +      if (accumulator.f0.contains(value)) {
    --- End diff --
    
    `contains` and `get` issue result in two map look-ups.
    It is more efficient to just do `get` and check for `null`


---

[GitHub] flink issue #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInfo; add ...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on the issue:

    https://github.com/apache/flink/pull/4585
  
    @fhueske Addressed your comments. PTAL. Much appreciated.


---

[GitHub] flink issue #4585: [FLINK-7491] add MultiSetTypeInfo; add built-in Collect A...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/4585
  
    Hi @suez1224  thanks for the PR, I think we can use `Array` instead of `AbstractMultiSet`. `AbstractMultiSet` is too obscure for users. In that case, we do not need the MultiSetSerilizer and MultiSetTypeInfo,  also the following queries can use UDF on the field with array type as the `evel(...)` parameters. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInfo; add ...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on the issue:

    https://github.com/apache/flink/pull/4585
  
    Hi @fhueske , I've filled out the PR template. Please take a look. Thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141093589
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    --- End diff --
    
    rm newline


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r140159387
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
    +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +import scala.collection.JavaConverters._
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
    +
    +abstract class CollectAggFunction[E]
    +  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
    +
    +  @transient
    +  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] {
    +    override def apply(t: Integer, u: Integer): Integer = t + u
    +  }
    +
    +  override def createAccumulator(): CollectAccumulator[E] = {
    +    val acc = new CollectAccumulator[E]()
    +    acc.f0 = new util.HashMap[E, Integer]()
    +    acc
    +  }
    +
    +  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
    +    if (value != null) {
    +      if (accumulator.f0.containsKey(value)) {
    +        val add = (x: Integer, y: Integer) => x + y
    +        accumulator.f0.merge(value, 1, addFunction)
    +      } else {
    +        accumulator.f0.put(value, 1)
    +      }
    +    }
    +  }
    +
    +  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, Integer] = {
    +    if (accumulator.f0.size() > 0) {
    +      new util.HashMap(accumulator.f0)
    +    } else {
    +      null.asInstanceOf[util.Map[E, Integer]]
    +    }
    +  }
    +
    +  def resetAccumulator(acc: CollectAccumulator[E]): Unit = {
    +    acc.f0.clear()
    +  }
    +
    +  override def getAccumulatorType: TypeInformation[CollectAccumulator[E]] = {
    +    new TupleTypeInfo(
    +      classOf[CollectAccumulator[E]],
    +      new GenericTypeInfo[util.Map[E, Integer]](classOf[util.Map[E, Integer]]))
    --- End diff --
    
    Changed to use MapViewTypeInfo here. However, if E is not basic type, I can only use GenericTypeInfo(please see ObjectCollectAggFunction), is there a better way? @fhueske 


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141494139
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils._
    +import org.apache.flink.table.api.dataview.MapView
    +import org.apache.flink.table.dataview.MapViewTypeInfo
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E](var f0:MapView[E, Integer]) {
    +  def this() {
    +    this(null)
    +  }
    +
    +  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
    +
    +  override def equals(that: Any): Boolean =
    --- End diff --
    
    normal scala class still need to, but case class dont need to.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r140024187
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
    +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +import scala.collection.JavaConverters._
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
    +
    +abstract class CollectAggFunction[E]
    +  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
    +
    +  @transient
    +  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] {
    +    override def apply(t: Integer, u: Integer): Integer = t + u
    +  }
    +
    +  override def createAccumulator(): CollectAccumulator[E] = {
    +    val acc = new CollectAccumulator[E]()
    +    acc.f0 = new util.HashMap[E, Integer]()
    +    acc
    +  }
    +
    +  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
    +    if (value != null) {
    +      if (accumulator.f0.containsKey(value)) {
    +        val add = (x: Integer, y: Integer) => x + y
    +        accumulator.f0.merge(value, 1, addFunction)
    +      } else {
    +        accumulator.f0.put(value, 1)
    +      }
    +    }
    +  }
    +
    +  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, Integer] = {
    +    if (accumulator.f0.size() > 0) {
    +      new util.HashMap(accumulator.f0)
    +    } else {
    +      null.asInstanceOf[util.Map[E, Integer]]
    +    }
    +  }
    +
    +  def resetAccumulator(acc: CollectAccumulator[E]): Unit = {
    +    acc.f0.clear()
    +  }
    +
    +  override def getAccumulatorType: TypeInformation[CollectAccumulator[E]] = {
    +    new TupleTypeInfo(
    +      classOf[CollectAccumulator[E]],
    +      new GenericTypeInfo[util.Map[E, Integer]](classOf[util.Map[E, Integer]]))
    --- End diff --
    
    Don't use a generic type here. This will result in a KryoSerializer which can be quite inefficient and result in state that cannot be upgraded. Rather use `MapTypeInformation`.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r140009815
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
    +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +import scala.collection.JavaConverters._
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
    --- End diff --
    
    We can use a `MapView` here. This feature was recently added and automatically backs the Map with a MapState if possible. Otherwise, it uses a Java HashMap (as right now). The benefit of backing the accumulator by MapState is that only the keys and values that are accessed need to be deserialized. In contrast, a regular HashMap is completely de/serialized every time the accumulator is read. Using MapView would require that the accumulator is implemented as a POJO (instead of a Tuple1). 
    
    Check this class for details [MapView](https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala) and let me know if you have questions.


---

[GitHub] flink issue #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInfo; add ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4585
  
    Thanks @suez1224, I'm quite busy atm but will try to have a look soon.
    Thanks, Fabian


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141097202
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +
    +	public MultisetTypeInfo(Class<T> elementTypeClass) {
    +		super(elementTypeClass, Integer.class);
    +	}
    +
    +	public MultisetTypeInfo(TypeInformation<T> elementTypeInfo) {
    +		super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  MultisetTypeInfo specific properties
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the type information for the elements contained in the Multiset
    +	 */
    +	public TypeInformation<T> getElementTypeInfo() {
    +		return getKeyTypeInfo();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  TypeInformation implementation
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public boolean isBasicType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isTupleType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getArity() {
    +		return 0;
    +	}
    +
    +	@Override
    +	public int getTotalFields() {
    --- End diff --
    
    The implementation of `getTotalFields()` of `MapTypeInfo` (which returns `2`) is not correct.
    Can you move our implementation to `MapTypeInfo`? Then we don't need to override it here.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141113194
  
    --- Diff: docs/dev/table/sql.md ---
    @@ -2107,6 +2108,17 @@ VAR_SAMP(value)
             <p>Returns the sample variance (square of the sample standard deviation) of the numeric field across all input values.</p>
           </td>
         </tr>
    +
    +    <tr>
    +      <td>
    +          {% highlight text %}
    +          COLLECT(value)
    +          {% endhighlight %}
    +      </td>
    +      <td>
    +          <p>Returns a multiset of the <i>value</i>s.</p>
    --- End diff --
    
    Be more specific about the handling of `null` values. Are they ignored? What is returned if only null values are added (`null` or empty multiset)?


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r139237144
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---
    @@ -211,6 +218,14 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
         canonize(relType)
       }
     
    +  override def createMultisetType(elementType: RelDataType, maxCardinality: Long): RelDataType = {
    +    val relType = new MultisetRelDataType(
    --- End diff --
    
    Added changes in FlinkRelNode & ExpressionReducer


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r138888586
  
    --- Diff: flink-core/pom.xml ---
    @@ -80,6 +80,13 @@ under the License.
     			<!-- managed version -->
     		</dependency>
     
    +		<!-- For multiset -->
    +		<dependency>
    +			<groupId>org.apache.commons</groupId>
    --- End diff --
    
    We should not add additional dependencies to Flink just because of a new data type. There is also no reason behind choosing this library. Couldn't we not just use a usual Java Map? Otherwise I would propose to add class for our own type like we did it for `org.apache.flink.types.Row`. Calcite is using `List`, which is not very nice, but would also work.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141260601
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +
    +	public MultisetTypeInfo(Class<T> elementTypeClass) {
    +		super(elementTypeClass, Integer.class);
    +	}
    +
    +	public MultisetTypeInfo(TypeInformation<T> elementTypeInfo) {
    +		super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  MultisetTypeInfo specific properties
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the type information for the elements contained in the Multiset
    +	 */
    +	public TypeInformation<T> getElementTypeInfo() {
    +		return getKeyTypeInfo();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  TypeInformation implementation
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public boolean isBasicType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isTupleType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getArity() {
    +		return 0;
    +	}
    +
    +	@Override
    +	public int getTotalFields() {
    --- End diff --
    
    @fhueske so MapTypeInfo should also return 1? I am a bit confused here, what does it mean for "the number of logical fields in this type" for MapType?


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r140167679
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
    +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +import scala.collection.JavaConverters._
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
    +
    +abstract class CollectAggFunction[E]
    +  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
    +
    +  @transient
    +  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] {
    +    override def apply(t: Integer, u: Integer): Integer = t + u
    +  }
    +
    +  override def createAccumulator(): CollectAccumulator[E] = {
    +    val acc = new CollectAccumulator[E]()
    +    acc.f0 = new util.HashMap[E, Integer]()
    +    acc
    +  }
    +
    +  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
    +    if (value != null) {
    +      if (accumulator.f0.containsKey(value)) {
    +        val add = (x: Integer, y: Integer) => x + y
    +        accumulator.f0.merge(value, 1, addFunction)
    +      } else {
    +        accumulator.f0.put(value, 1)
    +      }
    +    }
    +  }
    +
    +  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, Integer] = {
    +    if (accumulator.f0.size() > 0) {
    +      new util.HashMap(accumulator.f0)
    +    } else {
    +      null.asInstanceOf[util.Map[E, Integer]]
    +    }
    +  }
    +
    +  def resetAccumulator(acc: CollectAccumulator[E]): Unit = {
    +    acc.f0.clear()
    +  }
    +
    +  override def getAccumulatorType: TypeInformation[CollectAccumulator[E]] = {
    +    new TupleTypeInfo(
    +      classOf[CollectAccumulator[E]],
    +      new GenericTypeInfo[util.Map[E, Integer]](classOf[util.Map[E, Integer]]))
    --- End diff --
    
    We could have an abstract method `getElementTypeInfo()` that returns the type info for the elements. The basic types can be properly handled and for `Object` we fall back to `GenericType`.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141097303
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +
    +	public MultisetTypeInfo(Class<T> elementTypeClass) {
    +		super(elementTypeClass, Integer.class);
    +	}
    +
    +	public MultisetTypeInfo(TypeInformation<T> elementTypeInfo) {
    +		super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  MultisetTypeInfo specific properties
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the type information for the elements contained in the Multiset
    +	 */
    +	public TypeInformation<T> getElementTypeInfo() {
    +		return getKeyTypeInfo();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  TypeInformation implementation
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public boolean isBasicType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isTupleType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getArity() {
    +		return 0;
    +	}
    +
    +	@Override
    +	public int getTotalFields() {
    +		// similar as arrays, the multiset are "opaque" to the direct field addressing logic
    +		// since the multiset's elements are not addressable, we do not expose them
    +		return 1;
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public Class<Map<T, Integer>> getTypeClass() {
    +		return (Class<Map<T, Integer>>)(Class<?>)Map.class;
    +	}
    +
    +	@Override
    +	public boolean isKeyType() {
    --- End diff --
    
    implemented by `MapTypeInfo`, no need to override.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141255917
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +
    +	public MultisetTypeInfo(Class<T> elementTypeClass) {
    +		super(elementTypeClass, Integer.class);
    +	}
    +
    +	public MultisetTypeInfo(TypeInformation<T> elementTypeInfo) {
    +		super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  MultisetTypeInfo specific properties
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the type information for the elements contained in the Multiset
    +	 */
    +	public TypeInformation<T> getElementTypeInfo() {
    +		return getKeyTypeInfo();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  TypeInformation implementation
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public boolean isBasicType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isTupleType() {
    --- End diff --
    
    done


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141111367
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1414,8 +1414,29 @@ object AggregateUtil {
               aggregates(index) = udagg.getFunction
               accTypes(index) = udagg.accType
     
    -        case unSupported: SqlAggFunction =>
    --- End diff --
    
    Since we add a dedicated case for `COLLECT`, this case should not be remain at the end of this `match`.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141103652
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    --- End diff --
    
    Does SQL Multiset also support `null` values? If yes, we would need to wrap the `MapSerializer`.
    Otherwise, the problem would be that we would need to rely on the key serializer to support `null` which many serializers do not. An solution would be to wrap the `MapSerializer` and additionally serialize the count for `null` elements.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141576017
  
    --- Diff: docs/dev/table/sql.md ---
    @@ -746,6 +746,7 @@ The SQL runtime is built on top of Flink's DataSet and DataStream APIs. Internal
     | `Types.PRIMITIVE_ARRAY`| `ARRAY`                     | e.g. `int[]`           |
     | `Types.OBJECT_ARRAY`   | `ARRAY`                     | e.g. `java.lang.Byte[]`|
     | `Types.MAP`            | `MAP`                       | `java.util.HashMap`    |
    +| `Types.MULTISET`       | `MULTISET`                  | `java.util.HashMap`    |
    --- End diff --
    
    done


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r138889015
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---
    @@ -211,6 +218,14 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
         canonize(relType)
       }
     
    +  override def createMultisetType(elementType: RelDataType, maxCardinality: Long): RelDataType = {
    +    val relType = new MultisetRelDataType(
    --- End diff --
    
    There are multiple location where a new type has to be added like `FlinkRelNode`.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141100624
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils._
    +import org.apache.flink.table.api.dataview.MapView
    +import org.apache.flink.table.dataview.MapViewTypeInfo
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E](var f0:MapView[E, Integer]) {
    +  def this() {
    +    this(null)
    +  }
    +
    +  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
    +
    +  override def equals(that: Any): Boolean =
    --- End diff --
    
    Usually Pojos don't need to implement `equals`


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141260762
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    --- End diff --
    
    done


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r140089018
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala ---
    @@ -329,6 +329,35 @@ class AggregateITCase(
       }
     
       @Test
    +  def testTumbleWindowAggregateWithCollect(): Unit = {
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env, config)
    +
    +    val sqlQuery =
    +      "SELECT b, COLLECT(b)" +
    --- End diff --
    
    Updated the documentation.
    
    Table API ticket created: https://issues.apache.org/jira/browse/FLINK-7658?filter=-1


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141256073
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +
    +	public MultisetTypeInfo(Class<T> elementTypeClass) {
    +		super(elementTypeClass, Integer.class);
    +	}
    +
    +	public MultisetTypeInfo(TypeInformation<T> elementTypeInfo) {
    +		super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  MultisetTypeInfo specific properties
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the type information for the elements contained in the Multiset
    +	 */
    +	public TypeInformation<T> getElementTypeInfo() {
    +		return getKeyTypeInfo();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  TypeInformation implementation
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public boolean isBasicType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isTupleType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getArity() {
    +		return 0;
    +	}
    +
    +	@Override
    +	public int getTotalFields() {
    +		// similar as arrays, the multiset are "opaque" to the direct field addressing logic
    +		// since the multiset's elements are not addressable, we do not expose them
    +		return 1;
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public Class<Map<T, Integer>> getTypeClass() {
    +		return (Class<Map<T, Integer>>)(Class<?>)Map.class;
    +	}
    +
    +	@Override
    +	public boolean isKeyType() {
    --- End diff --
    
    done


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r142664563
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1410,6 +1410,26 @@ object AggregateUtil {
             case _: SqlCountAggFunction =>
               aggregates(index) = new CountAggFunction
     
    +        case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT =>
    +          aggregates(index) = sqlTypeName match {
    --- End diff --
    
    We can pass the actual `TypeInformation` of the argument type here to the constructor of the `CollectAggFunction` and don't need to distinguish the different argument types.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r140089533
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MultisetRelDataType.scala ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.schema
    +
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.sql.`type`.MultisetSqlType
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +
    +class MultisetRelDataType(
    +    val typeInfo: TypeInformation[_],
    +    elementType: RelDataType,
    +    isNullable: Boolean)
    +  extends MultisetSqlType(
    +    elementType,
    +    isNullable) {
    +
    +  override def toString = s"MULTISET($typeInfo)"
    --- End diff --
    
    Done


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r140176626
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
    +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +import scala.collection.JavaConverters._
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
    +
    +abstract class CollectAggFunction[E]
    +  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
    +
    +  @transient
    +  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] {
    +    override def apply(t: Integer, u: Integer): Integer = t + u
    +  }
    +
    +  override def createAccumulator(): CollectAccumulator[E] = {
    +    val acc = new CollectAccumulator[E]()
    +    acc.f0 = new util.HashMap[E, Integer]()
    +    acc
    +  }
    +
    +  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
    +    if (value != null) {
    +      if (accumulator.f0.containsKey(value)) {
    +        val add = (x: Integer, y: Integer) => x + y
    +        accumulator.f0.merge(value, 1, addFunction)
    +      } else {
    +        accumulator.f0.put(value, 1)
    +      }
    +    }
    +  }
    +
    +  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, Integer] = {
    +    if (accumulator.f0.size() > 0) {
    +      new util.HashMap(accumulator.f0)
    +    } else {
    +      null.asInstanceOf[util.Map[E, Integer]]
    +    }
    +  }
    +
    +  def resetAccumulator(acc: CollectAccumulator[E]): Unit = {
    +    acc.f0.clear()
    +  }
    +
    +  override def getAccumulatorType: TypeInformation[CollectAccumulator[E]] = {
    +    new TupleTypeInfo(
    +      classOf[CollectAccumulator[E]],
    +      new GenericTypeInfo[util.Map[E, Integer]](classOf[util.Map[E, Integer]]))
    --- End diff --
    
    @fhueske Thanks. I think that 's what exactly the current code is. Please take another look.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r140025368
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MultisetRelDataType.scala ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.schema
    +
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.sql.`type`.MultisetSqlType
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +
    +class MultisetRelDataType(
    +    val typeInfo: TypeInformation[_],
    +    elementType: RelDataType,
    +    isNullable: Boolean)
    +  extends MultisetSqlType(
    +    elementType,
    +    isNullable) {
    +
    +  override def toString = s"MULTISET($typeInfo)"
    --- End diff --
    
    should be rather `s"MULTISET($elementType)"`. `TypeInformation` is a Flink concept whereas RelDataType is in the Calcite context.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r142665388
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala ---
    @@ -92,6 +92,63 @@ class SqlITCase extends StreamingWithStateTestBase {
         assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
       }
     
    +  @Test
    +  def testUnboundedGroupByCollect(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    --- End diff --
    
    add `env.setStateBackend(this.getStateBackend)` to enforce serialization through the `MapView`.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141096324
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +
    +	public MultisetTypeInfo(Class<T> elementTypeClass) {
    +		super(elementTypeClass, Integer.class);
    +	}
    +
    +	public MultisetTypeInfo(TypeInformation<T> elementTypeInfo) {
    +		super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  MultisetTypeInfo specific properties
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the type information for the elements contained in the Multiset
    +	 */
    +	public TypeInformation<T> getElementTypeInfo() {
    +		return getKeyTypeInfo();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  TypeInformation implementation
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public boolean isBasicType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isTupleType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getArity() {
    --- End diff --
    
    implemented by `MapTypeInfo`, no need to override.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141093104
  
    --- Diff: docs/dev/table/sql.md ---
    @@ -746,6 +746,7 @@ The SQL runtime is built on top of Flink's DataSet and DataStream APIs. Internal
     | `Types.PRIMITIVE_ARRAY`| `ARRAY`                     | e.g. `int[]`           |
     | `Types.OBJECT_ARRAY`   | `ARRAY`                     | e.g. `java.lang.Byte[]`|
     | `Types.MAP`            | `MAP`                       | `java.util.HashMap`    |
    +| `Types.MULTISET`       | `MULTISET`                  | `java.util.HashMap`    |
    --- End diff --
    
    should we explain how the `HashMap` is used to represent the multiset, i.e., that a multiset of `String` is a `HashMap<String, Integer>`?


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r140083074
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
    +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +import scala.collection.JavaConverters._
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
    +
    +abstract class CollectAggFunction[E]
    +  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
    +
    +  @transient
    +  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] {
    +    override def apply(t: Integer, u: Integer): Integer = t + u
    +  }
    +
    +  override def createAccumulator(): CollectAccumulator[E] = {
    +    val acc = new CollectAccumulator[E]()
    +    acc.f0 = new util.HashMap[E, Integer]()
    +    acc
    +  }
    +
    +  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
    +    if (value != null) {
    +      if (accumulator.f0.containsKey(value)) {
    +        val add = (x: Integer, y: Integer) => x + y
    --- End diff --
    
    yes, removed.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r140159053
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
    +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +import scala.collection.JavaConverters._
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
    --- End diff --
    
    Please take another look, I've updated to use MapView.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141485942
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1414,8 +1414,29 @@ object AggregateUtil {
               aggregates(index) = udagg.getFunction
               accTypes(index) = udagg.accType
     
    -        case unSupported: SqlAggFunction =>
    -          throw new TableException(s"unsupported Function: '${unSupported.getName}'")
    +        case other: SqlAggFunction =>
    --- End diff --
    
    done


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r140026944
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala ---
    @@ -329,6 +329,35 @@ class AggregateITCase(
       }
     
       @Test
    +  def testTumbleWindowAggregateWithCollect(): Unit = {
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env, config)
    +
    +    val sqlQuery =
    +      "SELECT b, COLLECT(b)" +
    --- End diff --
    
    Collect should be added to the [SQL documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#built-in-functions) under "Built-in Function" -> "Aggregate Functions"
    
    Moreover, we should add `MULTISET` to the [supported data types](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#data-types).
    
    It would also be nice if you could open a JIRA to add support for COLLECT to the Table API. We try to keep both in sync and it helps if we have a list of things that need to be added.


---

[GitHub] flink issue #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInfo; add ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4585
  
    Merging


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141494189
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils._
    +import org.apache.flink.table.api.dataview.MapView
    +import org.apache.flink.table.dataview.MapViewTypeInfo
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E](var f0:MapView[E, Integer]) {
    --- End diff --
    
    done


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141449807
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils._
    +import org.apache.flink.table.api.dataview.MapView
    +import org.apache.flink.table.dataview.MapViewTypeInfo
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E](var f0:MapView[E, Integer]) {
    +  def this() {
    +    this(null)
    +  }
    +
    +  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
    +
    +  override def equals(that: Any): Boolean =
    +    that match {
    +      case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 == that.f0
    +      case _ => false
    +    }
    +}
    +
    +abstract class CollectAggFunction[E]
    +  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
    +
    +  override def createAccumulator(): CollectAccumulator[E] = {
    +    val acc = new CollectAccumulator[E](new MapView[E, Integer](
    +      getValueTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO))
    +    acc
    +  }
    +
    +  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
    +    if (value != null) {
    +      if (accumulator.f0.contains(value)) {
    --- End diff --
    
    Good catch. done.


---

[GitHub] flink issue #4585: [FLINK-7491] add MultiSetTypeInfo; add built-in Collect A...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4585
  
    Hi @suez1224, please read and fill out the template in the PR description. Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r142667720
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala ---
    @@ -92,6 +92,63 @@ class SqlITCase extends StreamingWithStateTestBase {
         assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
       }
     
    +  @Test
    +  def testUnboundedGroupByCollect(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +
    +    val sqlQuery = "SELECT b, COLLECT(a) FROM MyTable GROUP BY b"
    +
    +    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
    +    tEnv.registerTable("MyTable", t)
    +
    +    val result = tEnv.sql(sqlQuery).toRetractStream[Row]
    +    result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
    +    env.execute()
    +
    +    val expected = List(
    +      "1,{1=1}",
    +      "2,{2=1, 3=1}",
    +      "3,{4=1, 5=1, 6=1}",
    +      "4,{7=1, 8=1, 9=1, 10=1}",
    +      "5,{11=1, 12=1, 13=1, 14=1, 15=1}",
    +      "6,{16=1, 17=1, 18=1, 19=1, 20=1, 21=1}")
    +    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
    +  }
    +
    +  @Test
    +  def testUnboundedGroupByCollectWithObject(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    --- End diff --
    
    add `env.setStateBackend(this.getStateBackend)` to enforce serialization through the `MapView`.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r139237307
  
    --- Diff: flink-core/pom.xml ---
    @@ -80,6 +80,13 @@ under the License.
     			<!-- managed version -->
     		</dependency>
     
    +		<!-- For multiset -->
    +		<dependency>
    +			<groupId>org.apache.commons</groupId>
    --- End diff --
    
    Thanks. Use java.util.Map instead.  


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141352765
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +
    +	public MultisetTypeInfo(Class<T> elementTypeClass) {
    +		super(elementTypeClass, Integer.class);
    +	}
    +
    +	public MultisetTypeInfo(TypeInformation<T> elementTypeInfo) {
    +		super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  MultisetTypeInfo specific properties
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the type information for the elements contained in the Multiset
    +	 */
    +	public TypeInformation<T> getElementTypeInfo() {
    +		return getKeyTypeInfo();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  TypeInformation implementation
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public boolean isBasicType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isTupleType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getArity() {
    +		return 0;
    +	}
    +
    +	@Override
    +	public int getTotalFields() {
    --- End diff --
    
    Yes, `MapTypeInfo` should also return `1`. The method should return the number of nested fields as they are exposed to Flink's API. A Map as well as a MultiSet are handled by Flink's APIs as an atomic data type (you cannot reference a specific map entry as a key).


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141096269
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +
    +	public MultisetTypeInfo(Class<T> elementTypeClass) {
    +		super(elementTypeClass, Integer.class);
    +	}
    +
    +	public MultisetTypeInfo(TypeInformation<T> elementTypeInfo) {
    +		super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  MultisetTypeInfo specific properties
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the type information for the elements contained in the Multiset
    +	 */
    +	public TypeInformation<T> getElementTypeInfo() {
    +		return getKeyTypeInfo();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  TypeInformation implementation
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public boolean isBasicType() {
    --- End diff --
    
    implemented by `MapTypeInfo`, no need to override.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141507177
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    --- End diff --
    
    I took a look at Calcite tests for Collect function, null will be ignored.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141255931
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +
    +	public MultisetTypeInfo(Class<T> elementTypeClass) {
    +		super(elementTypeClass, Integer.class);
    +	}
    +
    +	public MultisetTypeInfo(TypeInformation<T> elementTypeInfo) {
    +		super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  MultisetTypeInfo specific properties
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the type information for the elements contained in the Multiset
    +	 */
    +	public TypeInformation<T> getElementTypeInfo() {
    +		return getKeyTypeInfo();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  TypeInformation implementation
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public boolean isBasicType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isTupleType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getArity() {
    --- End diff --
    
    done


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141256026
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +
    +	public MultisetTypeInfo(Class<T> elementTypeClass) {
    +		super(elementTypeClass, Integer.class);
    +	}
    +
    +	public MultisetTypeInfo(TypeInformation<T> elementTypeInfo) {
    +		super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  MultisetTypeInfo specific properties
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the type information for the elements contained in the Multiset
    +	 */
    +	public TypeInformation<T> getElementTypeInfo() {
    +		return getKeyTypeInfo();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  TypeInformation implementation
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public boolean isBasicType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isTupleType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getArity() {
    +		return 0;
    +	}
    +
    +	@Override
    +	public int getTotalFields() {
    +		// similar as arrays, the multiset are "opaque" to the direct field addressing logic
    +		// since the multiset's elements are not addressable, we do not expose them
    +		return 1;
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public Class<Map<T, Integer>> getTypeClass() {
    --- End diff --
    
    done


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141111022
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1414,8 +1414,29 @@ object AggregateUtil {
               aggregates(index) = udagg.getFunction
               accTypes(index) = udagg.accType
     
    -        case unSupported: SqlAggFunction =>
    -          throw new TableException(s"unsupported Function: '${unSupported.getName}'")
    +        case other: SqlAggFunction =>
    --- End diff --
    
    Change this case to `case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT =>` to have a dedicated case for this built-in function. Also the case after `case _: SqlCountAggFunction` to have all built-in functions together.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141099803
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils._
    +import org.apache.flink.table.api.dataview.MapView
    +import org.apache.flink.table.dataview.MapViewTypeInfo
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E](var f0:MapView[E, Integer]) {
    --- End diff --
    
    add space `var f0: MapView`


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r142903359
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1412,23 +1412,12 @@ object AggregateUtil {
     
             case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT =>
               aggregates(index) = sqlTypeName match {
    -            case TINYINT =>
    -              new ByteCollectAggFunction
    -            case SMALLINT =>
    -              new ShortCollectAggFunction
    -            case INTEGER =>
    -              new IntCollectAggFunction
    -            case BIGINT =>
    -              new LongCollectAggFunction
    -            case VARCHAR | CHAR =>
    -              new StringCollectAggFunction
    -            case FLOAT =>
    -              new FloatCollectAggFunction
    -            case DOUBLE =>
    -              new DoubleCollectAggFunction
    +            case TINYINT | SMALLINT | INTEGER | BIGINT | VARCHAR | CHAR | FLOAT | DOUBLE =>
    --- End diff --
    
    I was rather thinking to remove the `match case` block completely and set 
    
    ```
    aggregates(index) = new CollectAggFunction(FlinkTypeFactory.toTypeInfo(relDataType))
    ```


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141507401
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils._
    +import org.apache.flink.table.api.dataview.MapView
    +import org.apache.flink.table.dataview.MapViewTypeInfo
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E](var f0:MapView[E, Integer]) {
    +  def this() {
    +    this(null)
    +  }
    +
    +  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
    +
    +  override def equals(that: Any): Boolean =
    +    that match {
    +      case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 == that.f0
    +      case _ => false
    +    }
    +}
    +
    +abstract class CollectAggFunction[E]
    +  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
    +
    +  override def createAccumulator(): CollectAccumulator[E] = {
    +    val acc = new CollectAccumulator[E](new MapView[E, Integer](
    +      getValueTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO))
    +    acc
    +  }
    +
    +  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
    +    if (value != null) {
    +      if (accumulator.f0.contains(value)) {
    +        accumulator.f0.put(value, accumulator.f0.get(value) + 1)
    +      } else {
    +        accumulator.f0.put(value, 1)
    +      }
    +    }
    +  }
    +
    +  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, Integer] = {
    +    val iterator = accumulator.f0.iterator
    +    if (iterator.hasNext) {
    +      val map = new util.HashMap[E, Integer]()
    +      while (iterator.hasNext) {
    +        val entry = iterator.next()
    +        map.put(entry.getKey, entry.getValue)
    +      }
    +      map
    +    } else {
    +      null.asInstanceOf[util.Map[E, Integer]]
    --- End diff --
    
    Check with Calcite tests, should return an empty Multiset instead.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r142664211
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils._
    +import org.apache.flink.table.api.dataview.MapView
    +import org.apache.flink.table.dataview.MapViewTypeInfo
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +import scala.collection.JavaConverters._
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E](var f0:MapView[E, Integer]) {
    +  def this() {
    +      this(null)
    +  }
    +
    +  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
    +
    +  override def equals(that: Any): Boolean =
    +    that match {
    +      case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 == that.f0
    +      case _ => false
    +    }
    +}
    +
    +abstract class CollectAggFunction[E]
    --- End diff --
    
    I don't think we need to make this class abstract. Instead, we should add a constructor that asks for the `TypeInformation` of the value. Then we don't need to subclass the aggregation function and avoid most generic value types for non-primitive fields.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141095159
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    --- End diff --
    
    Add to `org.apache.flink.table.api.Types` class for easy creation of `TypeInformation`


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141105482
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.functions.aggfunctions
    +
    +import java.lang.{Iterable => JIterable}
    +import java.util
    +import java.util.function.BiFunction
    +
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
    +import org.apache.flink.api.java.typeutils._
    +import org.apache.flink.table.api.dataview.MapView
    +import org.apache.flink.table.dataview.MapViewTypeInfo
    +import org.apache.flink.table.functions.AggregateFunction
    +
    +
    +/** The initial accumulator for Collect aggregate function */
    +class CollectAccumulator[E](var f0:MapView[E, Integer]) {
    +  def this() {
    +    this(null)
    +  }
    +
    +  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
    +
    +  override def equals(that: Any): Boolean =
    +    that match {
    +      case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 == that.f0
    +      case _ => false
    +    }
    +}
    +
    +abstract class CollectAggFunction[E]
    +  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
    +
    +  override def createAccumulator(): CollectAccumulator[E] = {
    +    val acc = new CollectAccumulator[E](new MapView[E, Integer](
    +      getValueTypeInfo.asInstanceOf[TypeInformation[E]], BasicTypeInfo.INT_TYPE_INFO))
    +    acc
    +  }
    +
    +  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
    +    if (value != null) {
    --- End diff --
    
    If yes, we would need to check if `MapView` supports `null` keys. If not we could wrap the key in a Row of arity 1 because Row supports null serialization.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141485981
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1414,8 +1414,29 @@ object AggregateUtil {
               aggregates(index) = udagg.getFunction
               accTypes(index) = udagg.accType
     
    -        case unSupported: SqlAggFunction =>
    -          throw new TableException(s"unsupported Function: '${unSupported.getName}'")
    +        case other: SqlAggFunction =>
    +          if (other.getKind == SqlKind.COLLECT) {
    +            aggregates(index) = sqlTypeName match {
    +              case TINYINT =>
    +                new ByteCollectAggFunction
    +              case SMALLINT =>
    +                new ShortCollectAggFunction
    +              case INTEGER =>
    +                new IntCollectAggFunction
    +              case BIGINT =>
    +                new LongCollectAggFunction
    +              case VARCHAR | CHAR =>
    +                new StringCollectAggFunction
    +              case FLOAT =>
    +                new FloatCollectAggFunction
    +              case DOUBLE =>
    +                new DoubleCollectAggFunction
    +              case _ =>
    +                new ObjectCollectAggFunction
    +            }
    +          } else {
    --- End diff --
    
    done


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141097263
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +
    +	public MultisetTypeInfo(Class<T> elementTypeClass) {
    +		super(elementTypeClass, Integer.class);
    +	}
    +
    +	public MultisetTypeInfo(TypeInformation<T> elementTypeInfo) {
    +		super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  MultisetTypeInfo specific properties
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the type information for the elements contained in the Multiset
    +	 */
    +	public TypeInformation<T> getElementTypeInfo() {
    +		return getKeyTypeInfo();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  TypeInformation implementation
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public boolean isBasicType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public boolean isTupleType() {
    +		return false;
    +	}
    +
    +	@Override
    +	public int getArity() {
    +		return 0;
    +	}
    +
    +	@Override
    +	public int getTotalFields() {
    +		// similar as arrays, the multiset are "opaque" to the direct field addressing logic
    +		// since the multiset's elements are not addressable, we do not expose them
    +		return 1;
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	@Override
    +	public Class<Map<T, Integer>> getTypeClass() {
    --- End diff --
    
    implemented by `MapTypeInfo`, no need to override.


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141576005
  
    --- Diff: docs/dev/table/sql.md ---
    @@ -2107,6 +2108,17 @@ VAR_SAMP(value)
             <p>Returns the sample variance (square of the sample standard deviation) of the numeric field across all input values.</p>
           </td>
         </tr>
    +
    +    <tr>
    +      <td>
    +          {% highlight text %}
    +          COLLECT(value)
    +          {% endhighlight %}
    +      </td>
    +      <td>
    +          <p>Returns a multiset of the <i>value</i>s.</p>
    --- End diff --
    
    done


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141507197
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link TypeInformation} for the Multiset types of the Java API.
    + *
    + * @param <T> The type of the elements in the Multiset.
    + */
    +@PublicEvolving
    +public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> {
    +
    +	private static final long serialVersionUID = 1L;
    +
    +
    +	public MultisetTypeInfo(Class<T> elementTypeClass) {
    +		super(elementTypeClass, Integer.class);
    +	}
    +
    +	public MultisetTypeInfo(TypeInformation<T> elementTypeInfo) {
    +		super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  MultisetTypeInfo specific properties
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Gets the type information for the elements contained in the Multiset
    +	 */
    +	public TypeInformation<T> getElementTypeInfo() {
    +		return getKeyTypeInfo();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  TypeInformation implementation
    +	// ------------------------------------------------------------------------
    +
    +	@Override
    +	public boolean isBasicType() {
    --- End diff --
    
    done


---

[GitHub] flink pull request #4585: [FLINK-7491] [Table API & SQL] add MultiSetTypeInf...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4585


---