You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rex Fenley <Re...@remind101.com> on 2020/10/26 01:09:52 UTC

Failing to create Accumulator over multiple columns

Hello,

I'm trying to create an Accumulator that takes in 2 columns, where each
"value" is therefore a tuple, and results in a tuple of 2 arrays yet no
matter what I try I receive an error trace like the one below.

(Oddly, using almost the same pattern with 1 input column (a Long for
"value")  and 1 Array of Longs result/output works perfectly fine, so I'm
not sure why suddenly using a Tuple should make much of any difference.)

please help.

Error trace:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Given parameters do not match any signature.


*Actual: (java.lang.Long, java.lang.Boolean) Expected: (scala.Tuple2) *at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: Given parameters
do not match any signature.
Actual: (java.lang.Long, java.lang.Boolean)
Expected: (scala.Tuple2)
at
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:112)
at
org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:71)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:218)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:134)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:134)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:124)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
at
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
at
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
at
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:237)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:528)
at
org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
at remind.Job$.main(Job.scala:181)
at remind.Job.main(Job.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)

This is my code:

case class MembershipsIDsAcc(
var subscriberIDs: mutable.Set[Long],
var ownerIDs: mutable.Set[Long]
)
class MembershipsIDsAgg extends AggregateFunction[Row, MembershipsIDsAcc] {

override def createAccumulator(): MembershipsIDsAcc =
MembershipsIDsAcc(mutable.Set(), mutable.Set())

def accumulate(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
acc.subscriberIDs.add(value._1)
if (value._2) {
acc.ownerIDs.add(value._1)
} else {
acc.ownerIDs.remove(value._1)
}
}

def retract(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
acc.subscriberIDs.remove(value._1)
acc.ownerIDs.remove(value._1)
}

def resetAccumulator(acc: MembershipsIDsAcc): Unit = {
acc.subscriberIDs = mutable.Set()
acc.ownerIDs = mutable.Set()
}

override def getValue(acc: MembershipsIDsAcc): Row = {
Row.of(acc.subscriberIDs.toArray, acc.ownerIDs.toArray)
}

override def getResultType: TypeInformation[Row] = {
new RowTypeInfo(
createTypeInformation[Array[Long]],
createTypeInformation[Array[Long]]
)
}
}

// Usage
...
val membershipsByGroupId =
membershipsNotDeletedTable
.groupBy($"group_id")
.aggregate(
membershipsIDsAgg(
$"user_id",
$"owner"
) as ("subscriber_ids", "owner_ids")
)
.select($"group_id", $"subscriber_ids", $"owner_ids")
...

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Failing to create Accumulator over multiple columns

Posted by Rex Fenley <Re...@remind101.com>.
Thanks! Looks like that worked!

Fwiw, the error message is very confusing. Is there any way this can be
improved?

Thanks :)

On Sun, Oct 25, 2020 at 6:42 PM Xingbo Huang <hx...@gmail.com> wrote:

> Hi,
> I think you can directly declare `def accumulate(acc: MembershipsIDsAcc,
> value1: Long, value2: Boolean)`
>
> Best,
> Xingbo
>
> Rex Fenley <Re...@remind101.com> 于2020年10月26日周一 上午9:28写道:
>
>> If I switch accumulate to the following:
>> def accumulate(acc: MembershipsIDsAcc, value:
>> org.apache.flink.api.java.tuple.Tuple2[java.lang.Long, java.lang.Boolean]):
>> Unit = {...}
>>
>>
>> I instead receive:
>>
>> Tuple needs to be parameterized by using generics.
>>
>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:847)
>>
>> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:781)
>>
>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:735)
>>
>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:731)
>> ...
>>
>> On Sun, Oct 25, 2020 at 6:24 PM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Imports:
>>>
>>> import java.util.Date
>>> import org.apache.flink.api.scala._
>>> import org.apache.flink.api.common.typeinfo.TypeInformation
>>> import org.apache.flink.api.java.typeutils.RowTypeInfo
>>> import org.apache.flink.table.api.{
>>> DataTypes,
>>> EnvironmentSettings,
>>> TableEnvironment,
>>> TableSchema
>>> }
>>> import org.apache.flink.table.api.bridge.scala._
>>> import org.apache.flink.table.data.RowData
>>> import org.apache.flink.streaming.api.scala.{
>>> DataStream,
>>> StreamExecutionEnvironment
>>> }
>>> import org.apache.flink.types.Row
>>> import org.apache.flink.table.api._
>>> import
>>> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
>>> import
>>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase
>>> import
>>> org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSink
>>> import
>>> org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory
>>> import org.apache.flink.table.descriptors.Elasticsearch
>>> import org.apache.flink.table.descriptors._
>>> import org.apache.flink.table.factories.SerializationFormatFactory
>>> import org.apache.flink.formats.json
>>> import org.apache.flink.table.functions.AggregateFunction
>>> import org.apache.flink.types.ListValue
>>> import scala.collection.mutable
>>>
>>> On Sun, Oct 25, 2020 at 6:09 PM Rex Fenley <Re...@remind101.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm trying to create an Accumulator that takes in 2 columns, where each
>>>> "value" is therefore a tuple, and results in a tuple of 2 arrays yet no
>>>> matter what I try I receive an error trace like the one below.
>>>>
>>>> (Oddly, using almost the same pattern with 1 input column (a Long for
>>>> "value")  and 1 Array of Longs result/output works perfectly fine, so I'm
>>>> not sure why suddenly using a Tuple should make much of any difference.)
>>>>
>>>> please help.
>>>>
>>>> Error trace:
>>>>
>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method caused an error: Given parameters do not match any signature.
>>>>
>>>>
>>>> *Actual: (java.lang.Long, java.lang.Boolean) Expected: (scala.Tuple2) *at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>> at
>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>>>> at
>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>>>> at
>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>>>> at
>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>>>> at
>>>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>>>> Caused by: org.apache.flink.table.api.ValidationException: Given
>>>> parameters do not match any signature.
>>>> Actual: (java.lang.Long, java.lang.Boolean)
>>>> Expected: (scala.Tuple2)
>>>> at
>>>> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:112)
>>>> at
>>>> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:71)
>>>> at
>>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:218)
>>>> at
>>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:134)
>>>> at java.base/java.util.Optional.orElseGet(Optional.java:369)
>>>> at
>>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:134)
>>>> at
>>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
>>>> at
>>>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>>>> at
>>>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>>>> at
>>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:124)
>>>> at
>>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
>>>> at
>>>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>>>> at
>>>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>>>> at
>>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
>>>> at
>>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
>>>> at
>>>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>>>> at
>>>> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>>>> at
>>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>> at
>>>> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>>>> at
>>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
>>>> at
>>>> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
>>>> at
>>>> java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
>>>> at
>>>> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
>>>> at
>>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:237)
>>>> at
>>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:528)
>>>> at
>>>> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
>>>> at remind.Job$.main(Job.scala:181)
>>>> at remind.Job.main(Job.scala)
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method)
>>>> at
>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>
>>>> This is my code:
>>>>
>>>> case class MembershipsIDsAcc(
>>>> var subscriberIDs: mutable.Set[Long],
>>>> var ownerIDs: mutable.Set[Long]
>>>> )
>>>> class MembershipsIDsAgg extends AggregateFunction[Row,
>>>> MembershipsIDsAcc] {
>>>>
>>>> override def createAccumulator(): MembershipsIDsAcc =
>>>> MembershipsIDsAcc(mutable.Set(), mutable.Set())
>>>>
>>>> def accumulate(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit =
>>>> {
>>>> acc.subscriberIDs.add(value._1)
>>>> if (value._2) {
>>>> acc.ownerIDs.add(value._1)
>>>> } else {
>>>> acc.ownerIDs.remove(value._1)
>>>> }
>>>> }
>>>>
>>>> def retract(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
>>>> acc.subscriberIDs.remove(value._1)
>>>> acc.ownerIDs.remove(value._1)
>>>> }
>>>>
>>>> def resetAccumulator(acc: MembershipsIDsAcc): Unit = {
>>>> acc.subscriberIDs = mutable.Set()
>>>> acc.ownerIDs = mutable.Set()
>>>> }
>>>>
>>>> override def getValue(acc: MembershipsIDsAcc): Row = {
>>>> Row.of(acc.subscriberIDs.toArray, acc.ownerIDs.toArray)
>>>> }
>>>>
>>>> override def getResultType: TypeInformation[Row] = {
>>>> new RowTypeInfo(
>>>> createTypeInformation[Array[Long]],
>>>> createTypeInformation[Array[Long]]
>>>> )
>>>> }
>>>> }
>>>>
>>>> // Usage
>>>> ...
>>>> val membershipsByGroupId =
>>>> membershipsNotDeletedTable
>>>> .groupBy($"group_id")
>>>> .aggregate(
>>>> membershipsIDsAgg(
>>>> $"user_id",
>>>> $"owner"
>>>> ) as ("subscriber_ids", "owner_ids")
>>>> )
>>>> .select($"group_id", $"subscriber_ids", $"owner_ids")
>>>> ...
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Failing to create Accumulator over multiple columns

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,
I think you can directly declare `def accumulate(acc: MembershipsIDsAcc,
value1: Long, value2: Boolean)`

Best,
Xingbo

Rex Fenley <Re...@remind101.com> 于2020年10月26日周一 上午9:28写道:

> If I switch accumulate to the following:
> def accumulate(acc: MembershipsIDsAcc, value:
> org.apache.flink.api.java.tuple.Tuple2[java.lang.Long, java.lang.Boolean]):
> Unit = {...}
>
>
> I instead receive:
>
> Tuple needs to be parameterized by using generics.
>
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:847)
>
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:781)
>
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:735)
>
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:731)
> ...
>
> On Sun, Oct 25, 2020 at 6:24 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> Imports:
>>
>> import java.util.Date
>> import org.apache.flink.api.scala._
>> import org.apache.flink.api.common.typeinfo.TypeInformation
>> import org.apache.flink.api.java.typeutils.RowTypeInfo
>> import org.apache.flink.table.api.{
>> DataTypes,
>> EnvironmentSettings,
>> TableEnvironment,
>> TableSchema
>> }
>> import org.apache.flink.table.api.bridge.scala._
>> import org.apache.flink.table.data.RowData
>> import org.apache.flink.streaming.api.scala.{
>> DataStream,
>> StreamExecutionEnvironment
>> }
>> import org.apache.flink.types.Row
>> import org.apache.flink.table.api._
>> import
>> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
>> import
>> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase
>> import
>> org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSink
>> import
>> org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory
>> import org.apache.flink.table.descriptors.Elasticsearch
>> import org.apache.flink.table.descriptors._
>> import org.apache.flink.table.factories.SerializationFormatFactory
>> import org.apache.flink.formats.json
>> import org.apache.flink.table.functions.AggregateFunction
>> import org.apache.flink.types.ListValue
>> import scala.collection.mutable
>>
>> On Sun, Oct 25, 2020 at 6:09 PM Rex Fenley <Re...@remind101.com> wrote:
>>
>>> Hello,
>>>
>>> I'm trying to create an Accumulator that takes in 2 columns, where each
>>> "value" is therefore a tuple, and results in a tuple of 2 arrays yet no
>>> matter what I try I receive an error trace like the one below.
>>>
>>> (Oddly, using almost the same pattern with 1 input column (a Long for
>>> "value")  and 1 Array of Longs result/output works perfectly fine, so I'm
>>> not sure why suddenly using a Tuple should make much of any difference.)
>>>
>>> please help.
>>>
>>> Error trace:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Given parameters do not match any signature.
>>>
>>>
>>> *Actual: (java.lang.Long, java.lang.Boolean) Expected: (scala.Tuple2) *at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>> at
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>>> at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>>> at
>>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>>> Caused by: org.apache.flink.table.api.ValidationException: Given
>>> parameters do not match any signature.
>>> Actual: (java.lang.Long, java.lang.Boolean)
>>> Expected: (scala.Tuple2)
>>> at
>>> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:112)
>>> at
>>> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:71)
>>> at
>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:218)
>>> at
>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:134)
>>> at java.base/java.util.Optional.orElseGet(Optional.java:369)
>>> at
>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:134)
>>> at
>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
>>> at
>>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>>> at
>>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>>> at
>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:124)
>>> at
>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
>>> at
>>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>>> at
>>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>>> at
>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
>>> at
>>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
>>> at
>>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>>> at
>>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>>> at
>>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>>> at
>>> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>>> at
>>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>> at
>>> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>>> at
>>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
>>> at
>>> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
>>> at
>>> java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
>>> at
>>> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:237)
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:528)
>>> at
>>> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
>>> at remind.Job$.main(Job.scala:181)
>>> at remind.Job.main(Job.scala)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>
>>> This is my code:
>>>
>>> case class MembershipsIDsAcc(
>>> var subscriberIDs: mutable.Set[Long],
>>> var ownerIDs: mutable.Set[Long]
>>> )
>>> class MembershipsIDsAgg extends AggregateFunction[Row,
>>> MembershipsIDsAcc] {
>>>
>>> override def createAccumulator(): MembershipsIDsAcc =
>>> MembershipsIDsAcc(mutable.Set(), mutable.Set())
>>>
>>> def accumulate(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
>>> acc.subscriberIDs.add(value._1)
>>> if (value._2) {
>>> acc.ownerIDs.add(value._1)
>>> } else {
>>> acc.ownerIDs.remove(value._1)
>>> }
>>> }
>>>
>>> def retract(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
>>> acc.subscriberIDs.remove(value._1)
>>> acc.ownerIDs.remove(value._1)
>>> }
>>>
>>> def resetAccumulator(acc: MembershipsIDsAcc): Unit = {
>>> acc.subscriberIDs = mutable.Set()
>>> acc.ownerIDs = mutable.Set()
>>> }
>>>
>>> override def getValue(acc: MembershipsIDsAcc): Row = {
>>> Row.of(acc.subscriberIDs.toArray, acc.ownerIDs.toArray)
>>> }
>>>
>>> override def getResultType: TypeInformation[Row] = {
>>> new RowTypeInfo(
>>> createTypeInformation[Array[Long]],
>>> createTypeInformation[Array[Long]]
>>> )
>>> }
>>> }
>>>
>>> // Usage
>>> ...
>>> val membershipsByGroupId =
>>> membershipsNotDeletedTable
>>> .groupBy($"group_id")
>>> .aggregate(
>>> membershipsIDsAgg(
>>> $"user_id",
>>> $"owner"
>>> ) as ("subscriber_ids", "owner_ids")
>>> )
>>> .select($"group_id", $"subscriber_ids", $"owner_ids")
>>> ...
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Re: Failing to create Accumulator over multiple columns

Posted by Rex Fenley <Re...@remind101.com>.
If I switch accumulate to the following:
def accumulate(acc: MembershipsIDsAcc, value:
org.apache.flink.api.java.tuple.Tuple2[java.lang.Long, java.lang.Boolean]):
Unit = {...}


I instead receive:

Tuple needs to be parameterized by using generics.
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:847)
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:781)
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:735)
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:731)
...

On Sun, Oct 25, 2020 at 6:24 PM Rex Fenley <Re...@remind101.com> wrote:

> Imports:
>
> import java.util.Date
> import org.apache.flink.api.scala._
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.api.java.typeutils.RowTypeInfo
> import org.apache.flink.table.api.{
> DataTypes,
> EnvironmentSettings,
> TableEnvironment,
> TableSchema
> }
> import org.apache.flink.table.api.bridge.scala._
> import org.apache.flink.table.data.RowData
> import org.apache.flink.streaming.api.scala.{
> DataStream,
> StreamExecutionEnvironment
> }
> import org.apache.flink.types.Row
> import org.apache.flink.table.api._
> import
> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
> import
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase
> import
> org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSink
> import
> org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory
> import org.apache.flink.table.descriptors.Elasticsearch
> import org.apache.flink.table.descriptors._
> import org.apache.flink.table.factories.SerializationFormatFactory
> import org.apache.flink.formats.json
> import org.apache.flink.table.functions.AggregateFunction
> import org.apache.flink.types.ListValue
> import scala.collection.mutable
>
> On Sun, Oct 25, 2020 at 6:09 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> Hello,
>>
>> I'm trying to create an Accumulator that takes in 2 columns, where each
>> "value" is therefore a tuple, and results in a tuple of 2 arrays yet no
>> matter what I try I receive an error trace like the one below.
>>
>> (Oddly, using almost the same pattern with 1 input column (a Long for
>> "value")  and 1 Array of Longs result/output works perfectly fine, so I'm
>> not sure why suddenly using a Tuple should make much of any difference.)
>>
>> please help.
>>
>> Error trace:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Given parameters do not match any signature.
>>
>>
>> *Actual: (java.lang.Long, java.lang.Boolean) Expected: (scala.Tuple2) *at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>> at
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>> Caused by: org.apache.flink.table.api.ValidationException: Given
>> parameters do not match any signature.
>> Actual: (java.lang.Long, java.lang.Boolean)
>> Expected: (scala.Tuple2)
>> at
>> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:112)
>> at
>> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:71)
>> at
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:218)
>> at
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:134)
>> at java.base/java.util.Optional.orElseGet(Optional.java:369)
>> at
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:134)
>> at
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
>> at
>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>> at
>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>> at
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:124)
>> at
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
>> at
>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>> at
>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>> at
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
>> at
>> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
>> at
>> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>> at
>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>> at
>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>> at
>> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>> at
>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at
>> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>> at
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
>> at
>> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
>> at
>> java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
>> at
>> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
>> at
>> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:237)
>> at
>> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:528)
>> at
>> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
>> at remind.Job$.main(Job.scala:181)
>> at remind.Job.main(Job.scala)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>
>> This is my code:
>>
>> case class MembershipsIDsAcc(
>> var subscriberIDs: mutable.Set[Long],
>> var ownerIDs: mutable.Set[Long]
>> )
>> class MembershipsIDsAgg extends AggregateFunction[Row,
>> MembershipsIDsAcc] {
>>
>> override def createAccumulator(): MembershipsIDsAcc =
>> MembershipsIDsAcc(mutable.Set(), mutable.Set())
>>
>> def accumulate(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
>> acc.subscriberIDs.add(value._1)
>> if (value._2) {
>> acc.ownerIDs.add(value._1)
>> } else {
>> acc.ownerIDs.remove(value._1)
>> }
>> }
>>
>> def retract(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
>> acc.subscriberIDs.remove(value._1)
>> acc.ownerIDs.remove(value._1)
>> }
>>
>> def resetAccumulator(acc: MembershipsIDsAcc): Unit = {
>> acc.subscriberIDs = mutable.Set()
>> acc.ownerIDs = mutable.Set()
>> }
>>
>> override def getValue(acc: MembershipsIDsAcc): Row = {
>> Row.of(acc.subscriberIDs.toArray, acc.ownerIDs.toArray)
>> }
>>
>> override def getResultType: TypeInformation[Row] = {
>> new RowTypeInfo(
>> createTypeInformation[Array[Long]],
>> createTypeInformation[Array[Long]]
>> )
>> }
>> }
>>
>> // Usage
>> ...
>> val membershipsByGroupId =
>> membershipsNotDeletedTable
>> .groupBy($"group_id")
>> .aggregate(
>> membershipsIDsAgg(
>> $"user_id",
>> $"owner"
>> ) as ("subscriber_ids", "owner_ids")
>> )
>> .select($"group_id", $"subscriber_ids", $"owner_ids")
>> ...
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Failing to create Accumulator over multiple columns

Posted by Rex Fenley <Re...@remind101.com>.
Imports:

import java.util.Date
import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.{
DataTypes,
EnvironmentSettings,
TableEnvironment,
TableSchema
}
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.data.RowData
import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.types.Row
import org.apache.flink.table.api._
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase
import
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSink
import
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory
import org.apache.flink.table.descriptors.Elasticsearch
import org.apache.flink.table.descriptors._
import org.apache.flink.table.factories.SerializationFormatFactory
import org.apache.flink.formats.json
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.types.ListValue
import scala.collection.mutable

On Sun, Oct 25, 2020 at 6:09 PM Rex Fenley <Re...@remind101.com> wrote:

> Hello,
>
> I'm trying to create an Accumulator that takes in 2 columns, where each
> "value" is therefore a tuple, and results in a tuple of 2 arrays yet no
> matter what I try I receive an error trace like the one below.
>
> (Oddly, using almost the same pattern with 1 input column (a Long for
> "value")  and 1 Array of Longs result/output works perfectly fine, so I'm
> not sure why suddenly using a Tuple should make much of any difference.)
>
> please help.
>
> Error trace:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Given parameters do not match any signature.
>
>
> *Actual: (java.lang.Long, java.lang.Boolean) Expected: (scala.Tuple2) *at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: org.apache.flink.table.api.ValidationException: Given
> parameters do not match any signature.
> Actual: (java.lang.Long, java.lang.Boolean)
> Expected: (scala.Tuple2)
> at
> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.validateArguments(PlannerTypeInferenceUtilImpl.java:112)
> at
> org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl.runTypeInference(PlannerTypeInferenceUtilImpl.java:71)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.runLegacyTypeInference(ResolveCallByArgumentsRule.java:218)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.lambda$visit$2(ResolveCallByArgumentsRule.java:134)
> at java.base/java.util.Optional.orElseGet(Optional.java:369)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:134)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
> at
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
> at
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:124)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
> at
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
> at
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
> at
> java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271)
> at
> java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> at
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> at
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
> at
> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
> at java.base/java.util.function.Function.lambda$andThen$1(Function.java:88)
> at
> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:237)
> at
> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:528)
> at
> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
> at remind.Job$.main(Job.scala:181)
> at remind.Job.main(Job.scala)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>
> This is my code:
>
> case class MembershipsIDsAcc(
> var subscriberIDs: mutable.Set[Long],
> var ownerIDs: mutable.Set[Long]
> )
> class MembershipsIDsAgg extends AggregateFunction[Row, MembershipsIDsAcc]
> {
>
> override def createAccumulator(): MembershipsIDsAcc =
> MembershipsIDsAcc(mutable.Set(), mutable.Set())
>
> def accumulate(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
> acc.subscriberIDs.add(value._1)
> if (value._2) {
> acc.ownerIDs.add(value._1)
> } else {
> acc.ownerIDs.remove(value._1)
> }
> }
>
> def retract(acc: MembershipsIDsAcc, value: (Long, Boolean)): Unit = {
> acc.subscriberIDs.remove(value._1)
> acc.ownerIDs.remove(value._1)
> }
>
> def resetAccumulator(acc: MembershipsIDsAcc): Unit = {
> acc.subscriberIDs = mutable.Set()
> acc.ownerIDs = mutable.Set()
> }
>
> override def getValue(acc: MembershipsIDsAcc): Row = {
> Row.of(acc.subscriberIDs.toArray, acc.ownerIDs.toArray)
> }
>
> override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(
> createTypeInformation[Array[Long]],
> createTypeInformation[Array[Long]]
> )
> }
> }
>
> // Usage
> ...
> val membershipsByGroupId =
> membershipsNotDeletedTable
> .groupBy($"group_id")
> .aggregate(
> membershipsIDsAgg(
> $"user_id",
> $"owner"
> ) as ("subscriber_ids", "owner_ids")
> )
> .select($"group_id", $"subscriber_ids", $"owner_ids")
> ...
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>