You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijay Balakrishnan <bv...@gmail.com> on 2019/05/01 20:40:39 UTC

Type Erasure - Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.

Hi,
Had asked this questions earlier as topic - "Flink - Type Erasure Exception
trying to use Tuple6 instead of Tuple"

Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2
etc.
Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
Tuple2, etc.) instead.

DataStream<Map<String, Object>> kinesisStream = ...;
KeyedStream<Map<String, Object>, Tuple> monitoringTupleKeyedStream =
kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<===== complains
about Tuple type for monitoringTupleKeyedStream
.....

public static class MapTupleKeySelector implements KeySelector<Map<String,
Object>, Tuple> {
        private final Set<String> groupBySet;

        public MapTupleKeySelector(Set<String> groupBySet) {
            this.groupBySet = groupBySet;
        }

        @Override
        public Tuple getKey(Map<String, Object> inputMap) throws Exception {
            int groupBySetSize = groupBySet.size();
            Tuple tuple = Tuple.newInstance(groupBySetSize);
            //Tuple1 tuple = new Tuple1();
            int count = 0;
            for (String groupBy : groupBySet) {
tuple.setField(groupByValue, count++);
            }
            return tuple;
        }
    }

Abhishek had replied back in the Thread as follows: (posting in that thread
as well creating a new thread):
However, If you are trying to build some generic framework and for
different streams, there would be different fields, you can follow the Map
approach. For the latter approach, you need to write extra mapper class
which will convert all the fields in the stream to the Map based stream.

Can I get an example of how to create this extra Mapper class ?

Currently, I am using deserialization to convert the incoming byte[] by
implementing KinesisDeserializationSchema<Map<String, Object>> to convert
to a DataStream<Map<String, Object>> kinesisStream.

TIA,

Re: Type Erasure - Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.

Posted by Vijay Balakrishnan <bv...@gmail.com>.
Hi Chesnay,
Sorry for causing the confusion. I solved the problem by following another
person's recommendation on the other post about using a wrapper POJO.
So, I used a wrapper MonitoringTuple to wrap the Tuple and that solved my
problem with varying number of fields in the Tuple interface.

public class MonitoringTuple {
>     private Tuple tuple;
>
>
Then, I used it like this:

> KeyedStream<Map<String, Object>, MonitoringTuple>
> monitoringTupleKeyedStream = kinesisStream.keyBy(new
> MapTupleKeySelector(groupBySet));


The MapTupleKeySelector is defined below:

> public static class MapTupleKeySelector implements KeySelector<Map<String,
> Object>, MonitoringTuple> {

        private final Set<String> groupBySet;


>         public MapTupleKeySelector(Set<String> groupBySet) {

            this.groupBySet = groupBySet;

        }


>         @Override

        public MonitoringTuple getKey(Map<String, Object> inputMap) {

            int groupBySetSize = groupBySet.size();

            Tuple tuple = Tuple.newInstance(groupBySetSize);

            int count = 0;

            for (String groupBy : groupBySet) {

                    count = setTupleField(inputMap, tuple, count, groupBy);

            }

            return new MonitoringTuple(tuple);

        }

    }


>     public static int setTupleField(Map<String, Object> inputMap, Tuple
> tuple, int count, String groupBy) {

        Object groupByValueObj = inputMap.get(groupBy);

        String tupleValue = Utils.convertToString(groupByValueObj);

        tuple.setField(tupleValue, count++);

        return count;

    }

}





TIA,

On Thu, May 2, 2019 at 4:54 AM Chesnay Schepler <ch...@apache.org> wrote:

> I'm not sure what you're asking.
>
> If you have a Deserialization schema that convert the data into a Map
> you're done as I understand it, what do you believe to be missing?
>
> If, for a given job, the number/types of fields are fixed you could look
> into using Row.
>
> On 01/05/2019 22:40, Vijay Balakrishnan wrote:
>
> Hi,
> Had asked this questions earlier as topic - "Flink - Type Erasure
> Exception trying to use Tuple6 instead of Tuple"
>
> Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2
> etc.
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
> Tuple2, etc.) instead.
>
> DataStream<Map<String, Object>> kinesisStream = ...;
> KeyedStream<Map<String, Object>, Tuple> monitoringTupleKeyedStream =
> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<===== complains
> about Tuple type for monitoringTupleKeyedStream
> .....
>
> public static class MapTupleKeySelector implements KeySelector<Map<String,
> Object>, Tuple> {
>         private final Set<String> groupBySet;
>
>         public MapTupleKeySelector(Set<String> groupBySet) {
>             this.groupBySet = groupBySet;
>         }
>
>         @Override
>         public Tuple getKey(Map<String, Object> inputMap) throws Exception
> {
>             int groupBySetSize = groupBySet.size();
>             Tuple tuple = Tuple.newInstance(groupBySetSize);
>             //Tuple1 tuple = new Tuple1();
>             int count = 0;
>             for (String groupBy : groupBySet) {
> tuple.setField(groupByValue, count++);
>             }
>             return tuple;
>         }
>     }
>
> Abhishek had replied back in the Thread as follows: (posting in that
> thread as well creating a new thread):
> However, If you are trying to build some generic framework and for
> different streams, there would be different fields, you can follow the Map
> approach. For the latter approach, you need to write extra mapper class
> which will convert all the fields in the stream to the Map based stream.
>
> Can I get an example of how to create this extra Mapper class ?
>
> Currently, I am using deserialization to convert the incoming byte[] by
> implementing KinesisDeserializationSchema<Map<String, Object>> to convert
> to a DataStream<Map<String, Object>> kinesisStream.
>
> TIA,
>
>
>

Re: Type Erasure - Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.

Posted by Chesnay Schepler <ch...@apache.org>.
I'm not sure what you're asking.

If you have a Deserialization schema that convert the data into a Map 
you're done as I understand it, what do you believe to be missing?

If, for a given job, the number/types of fields are fixed you could look 
into using Row.

On 01/05/2019 22:40, Vijay Balakrishnan wrote:
> Hi,
> Had asked this questions earlier as topic - "Flink - Type Erasure 
> Exception trying to use Tuple6 instead of Tuple"
>
> Having issues defining a generic Tuple instead of a specific 
> Tuple1,Tuple2 etc.
> Exception in thread "main" 
> org.apache.flink.api.common.functions.InvalidTypesException: Usage of 
> class Tuple as a type is not allowed. Use a concrete subclass (e.g. 
> Tuple1, Tuple2, etc.) instead.
>
> DataStream<Map<String, Object>> kinesisStream = ...;
> KeyedStream<Map<String, Object>, Tuple> monitoringTupleKeyedStream = 
> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<===== 
> complains about Tuple type for monitoringTupleKeyedStream
> .....
>
> public static class MapTupleKeySelector implements 
> KeySelector<Map<String, Object>, Tuple> {
>         private final Set<String> groupBySet;
>
>         public MapTupleKeySelector(Set<String> groupBySet) {
>             this.groupBySet = groupBySet;
>         }
>
>         @Override
>         public Tuple getKey(Map<String, Object> inputMap) throws 
> Exception {
>             int groupBySetSize = groupBySet.size();
>             Tuple tuple = Tuple.newInstance(groupBySetSize);
>             //Tuple1 tuple = new Tuple1();
>             int count = 0;
>             for (String groupBy : groupBySet) {
> tuple.setField(groupByValue, count++);
>             }
>             return tuple;
>         }
>     }
>
> Abhishek had replied back in the Thread as follows: (posting in that 
> thread as well creating a new thread):
> However, If you are trying to build some generic framework and for 
> different streams, there would be different fields, you can follow the 
> Map approach. For the latter approach, you need to write extra mapper 
> class which will convert all the fields in the stream to the Map based 
> stream.
>
> Can I get an example of how to create this extra Mapper class ?
>
> Currently, I am using deserialization to convert the incoming byte[] 
> by implementing KinesisDeserializationSchema<Map<String, Object>> to 
> convert to a DataStream<Map<String, Object>> kinesisStream.
>
> TIA,