You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Boris Lublinsky <bo...@lightbend.com> on 2018/01/11 00:20:34 UTC

Java types

I am trying to covert Scala code (which works fine) to Java
The sacral code is:
// create a Kafka consumers
// Data
val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]](
  DATA_TOPIC,
  new ByteArraySchema,
  dataKafkaProps
)

// Model
val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]](
  MODELS_TOPIC,
  new ByteArraySchema,
  modelKafkaProps
)

// Create input data streams
val modelsStream = env.addSource(modelConsumer)
val dataStream = env.addSource(dataConsumer)

// Read data from streams
val models = modelsStream.map(ModelToServe.fromByteArray(_))
  .flatMap(BadDataHandler[ModelToServe])
  .keyBy(_.dataType)
val data = dataStream.map(DataRecord.fromByteArray(_))
  .flatMap(BadDataHandler[WineRecord])
  .keyBy(_.dataType)
Now I am trying to re write it to Java and fighting with the requirement of providing types, where they should be obvious

// create a Kafka consumers
// Data
FlinkKafkaConsumer010<byte[]> dataConsumer = new FlinkKafkaConsumer010<>(
        ModelServingConfiguration.DATA_TOPIC,
        new ByteArraySchema(),
        dataKafkaProps);

// Model
FlinkKafkaConsumer010<byte[]>  modelConsumer = new FlinkKafkaConsumer010<>(
        ModelServingConfiguration.MODELS_TOPIC,
        new ByteArraySchema(),
        modelKafkaProps);

// Create input data streams
DataStream<byte[]> modelsStream = env.addSource(modelConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
DataStream<byte[]> dataStream = env.addSource(dataConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
// Read data from streams
DataStream<Tuple2<String,ModelToServe>> models = modelsStream
     .flatMap(new ModelConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class)));

Am I missing something similar to import org.apache.flink.api.scala._
 In java?

Now if this is an only way, Does this seems right?

Boris Lublinsky
FDP Architect
boris.lublinsky@lightbend.com
https://www.lightbend.com/


Re: Java types

Posted by Timo Walther <tw...@apache.org>.
Could you send us the definition of the class or even better a small 
code example on Github to reproduce your error?

If you are implementing a Flink job in Java you should not have any 
org.apache.flink...scala.... import in your class file.

Regards,
Timo



Hi Timo
"You don't need to specify the type in .flatMap() explicitly. It will be 
automatically extracted using the generic signature of DataDataConverter.”
It does not. That is the reason why I had to add it there

> Regarding your error. Make sure that you don't mix up the API classes. 
> If you want to use the Java API you should not use 
> "org.apache.flink.streaming.api.scala.DataStream" but the Java one.
I rewrote the class in Java. Thats why I am so confused



Am 1/11/18 um 10:07 AM schrieb Timo Walther:
> Hi Boris,
>
> each API is designed language-specific so they might not always be the 
> same. Scala has better type extraction features and let you write code 
> very precisely. Java requires sometime more code to archieve the same.
>
> You don't need to specify the type in .flatMap() explicitly. It will 
> be automatically extracted using the generic signature of 
> DataDataConverter.
>
> Regarding your error. Make sure that you don't mix up the API classes. 
> If you want to use the Java API you should not use 
> "org.apache.flink.streaming.api.scala.DataStream" but the Java one.
>
> Regards,
> Timo
>
>
>
> Am 1/11/18 um 5:13 AM schrieb Boris Lublinsky:
>> More questions
>> In Scala my DataProcessor is defined as
>> class DataProcessorKeyedextends CoProcessFunction[WineRecord, ModelToServe, Double]with CheckpointedFunction {
>> And it is used as follows
>> val models = modelsStream.map(ModelToServe.fromByteArray(_))
>>    .flatMap(BadDataHandler[ModelToServe])
>>    .keyBy(_.dataType)
>> val data = dataStream.map(DataRecord.fromByteArray(_))
>>    .flatMap(BadDataHandler[WineRecord])
>>    .keyBy(_.dataType)
>>
>> // Merge streams data
>>    .connect(models)
>>    .process(DataProcessorKeyed())
>> When I am doing the same thing in Java
>> public class DataProcessorKeyedextends CoProcessFunction<Winerecord.WineRecord, ModelToServe, Double>implements CheckpointedFunction{
>> Which I am using as follows
>> // Read data from streams DataStream<Tuple2<String, ModelToServe>> models = modelsStream
>>          .flatMap(new ModelDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class)))
>>          .keyBy(0); DataStream<Tuple2<String, Winerecord.WineRecord>> data = dataStream
>>          .flatMap(new DataDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(Winerecord.WineRecord.class)))
>>          .keyBy(0); // Merge streams data
>>          .connect(models)
>>          .process(new DataProcessorKeyed());
>> I am getting an error
>>
>> Error:(68, 17) java: no suitable method found for keyBy(int)
>>     method 
>> org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.collection.Seq<java.lang.Object>) 
>> is not applicable
>>       (argument mismatch; int cannot be converted to 
>> scala.collection.Seq<java.lang.Object>)
>>     method 
>> org.apache.flink.streaming.api.scala.DataStream.<K>keyBy(scala.Function1<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,com.lightbend.model.ModelToServe>,K>,org.apache.flink.api.common.typeinfo.TypeInformation<K>) 
>> is not applicable
>>       (cannot infer type-variable(s) K
>>         (actual and formal argument lists differ in length))
>> So it assumes key/value pairs for the coprocessor
>>
>> Why is such difference between APIs?
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublinsky@lightbend.com <ma...@lightbend.com>
>> https://www.lightbend.com/
>>
>>> On Jan 10, 2018, at 6:20 PM, Boris Lublinsky 
>>> <boris.lublinsky@lightbend.com 
>>> <ma...@lightbend.com>> wrote:
>>>
>>> I am trying to covert Scala code (which works fine) to Java
>>> The sacral code is:
>>> // create a Kafka consumers // Data val dataConsumer =new FlinkKafkaConsumer010[Array[Byte]](
>>>    DATA_TOPIC, new ByteArraySchema, dataKafkaProps
>>> )
>>>
>>> // Model val modelConsumer =new FlinkKafkaConsumer010[Array[Byte]](
>>>    MODELS_TOPIC, new ByteArraySchema, modelKafkaProps
>>> )
>>>
>>> // Create input data streams val modelsStream = env.addSource(modelConsumer)
>>> val dataStream = env.addSource(dataConsumer)
>>>
>>> // Read data from streams val models = modelsStream.map(ModelToServe.fromByteArray(_))
>>>    .flatMap(BadDataHandler[ModelToServe])
>>>    .keyBy(_.dataType)
>>> val data = dataStream.map(DataRecord.fromByteArray(_))
>>>    .flatMap(BadDataHandler[WineRecord])
>>>    .keyBy(_.dataType)
>>> Now I am trying to re write it to Java and fighting with the 
>>> requirement of providing types, where they should be obvious
>>>
>>> // create a Kafka consumers // Data FlinkKafkaConsumer010<byte[]> dataConsumer =new FlinkKafkaConsumer010<>(
>>>          ModelServingConfiguration.DATA_TOPIC, new ByteArraySchema(), dataKafkaProps); // Model FlinkKafkaConsumer010<byte[]>  modelConsumer =new FlinkKafkaConsumer010<>(
>>>          ModelServingConfiguration.MODELS_TOPIC, new ByteArraySchema(), modelKafkaProps); // Create input data streams DataStream<byte[]> modelsStream = env.addSource(modelConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); DataStream<byte[]> dataStream = env.addSource(dataConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
>>> // Read data from streams DataStream<Tuple2<String,ModelToServe>> models = modelsStream
>>>       .flatMap(new ModelConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class)));
>>> Am I missing something similar toimport org.apache.flink.api.scala._
>>>  In java?
>>>
>>> Now if this is an only way, Does this seems right?
>>>
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublinsky@lightbend.com <ma...@lightbend.com>
>>> https://www.lightbend.com/
>>>
>>
>


Re: Java types

Posted by Timo Walther <tw...@apache.org>.
Hi Boris,

each API is designed language-specific so they might not always be the 
same. Scala has better type extraction features and let you write code 
very precisely. Java requires sometime more code to archieve the same.

You don't need to specify the type in .flatMap() explicitly. It will be 
automatically extracted using the generic signature of DataDataConverter.

Regarding your error. Make sure that you don't mix up the API classes. 
If you want to use the Java API you should not use 
"org.apache.flink.streaming.api.scala.DataStream" but the Java one.

Regards,
Timo



Am 1/11/18 um 5:13 AM schrieb Boris Lublinsky:
> More questions
> In Scala my DataProcessor is defined as
> class DataProcessorKeyedextends CoProcessFunction[WineRecord, ModelToServe, Double]with CheckpointedFunction {
> And it is used as follows
> val models = modelsStream.map(ModelToServe.fromByteArray(_))
>    .flatMap(BadDataHandler[ModelToServe])
>    .keyBy(_.dataType)
> val data = dataStream.map(DataRecord.fromByteArray(_))
>    .flatMap(BadDataHandler[WineRecord])
>    .keyBy(_.dataType)
>
> // Merge streams data
>    .connect(models)
>    .process(DataProcessorKeyed())
> When I am doing the same thing in Java
> public class DataProcessorKeyedextends CoProcessFunction<Winerecord.WineRecord, ModelToServe, Double>implements CheckpointedFunction{
> Which I am using as follows
> // Read data from streams DataStream<Tuple2<String, ModelToServe>> models = modelsStream
>          .flatMap(new ModelDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class)))
>          .keyBy(0); DataStream<Tuple2<String, Winerecord.WineRecord>> data = dataStream
>          .flatMap(new DataDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(Winerecord.WineRecord.class)))
>          .keyBy(0); // Merge streams data
>          .connect(models)
>          .process(new DataProcessorKeyed());
> I am getting an error
>
> Error:(68, 17) java: no suitable method found for keyBy(int)
>     method 
> org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.collection.Seq<java.lang.Object>) 
> is not applicable
>       (argument mismatch; int cannot be converted to 
> scala.collection.Seq<java.lang.Object>)
>     method 
> org.apache.flink.streaming.api.scala.DataStream.<K>keyBy(scala.Function1<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,com.lightbend.model.ModelToServe>,K>,org.apache.flink.api.common.typeinfo.TypeInformation<K>) 
> is not applicable
>       (cannot infer type-variable(s) K
>         (actual and formal argument lists differ in length))
> So it assumes key/value pairs for the coprocessor
>
> Why is such difference between APIs?
>
> Boris Lublinsky
> FDP Architect
> boris.lublinsky@lightbend.com <ma...@lightbend.com>
> https://www.lightbend.com/
>
>> On Jan 10, 2018, at 6:20 PM, Boris Lublinsky 
>> <boris.lublinsky@lightbend.com 
>> <ma...@lightbend.com>> wrote:
>>
>> I am trying to covert Scala code (which works fine) to Java
>> The sacral code is:
>> // create a Kafka consumers // Data val dataConsumer =new FlinkKafkaConsumer010[Array[Byte]](
>>    DATA_TOPIC, new ByteArraySchema, dataKafkaProps
>> )
>>
>> // Model val modelConsumer =new FlinkKafkaConsumer010[Array[Byte]](
>>    MODELS_TOPIC, new ByteArraySchema, modelKafkaProps
>> )
>>
>> // Create input data streams val modelsStream = env.addSource(modelConsumer)
>> val dataStream = env.addSource(dataConsumer)
>>
>> // Read data from streams val models = modelsStream.map(ModelToServe.fromByteArray(_))
>>    .flatMap(BadDataHandler[ModelToServe])
>>    .keyBy(_.dataType)
>> val data = dataStream.map(DataRecord.fromByteArray(_))
>>    .flatMap(BadDataHandler[WineRecord])
>>    .keyBy(_.dataType)
>> Now I am trying to re write it to Java and fighting with the 
>> requirement of providing types, where they should be obvious
>>
>> // create a Kafka consumers // Data FlinkKafkaConsumer010<byte[]> dataConsumer =new FlinkKafkaConsumer010<>(
>>          ModelServingConfiguration.DATA_TOPIC, new ByteArraySchema(), dataKafkaProps); // Model FlinkKafkaConsumer010<byte[]>  modelConsumer =new FlinkKafkaConsumer010<>(
>>          ModelServingConfiguration.MODELS_TOPIC, new ByteArraySchema(), modelKafkaProps); // Create input data streams DataStream<byte[]> modelsStream = env.addSource(modelConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); DataStream<byte[]> dataStream = env.addSource(dataConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
>> // Read data from streams DataStream<Tuple2<String,ModelToServe>> models = modelsStream
>>       .flatMap(new ModelConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class)));
>> Am I missing something similar toimport org.apache.flink.api.scala._
>>  In java?
>>
>> Now if this is an only way, Does this seems right?
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublinsky@lightbend.com <ma...@lightbend.com>
>> https://www.lightbend.com/
>>
>


Re: Java types

Posted by Boris Lublinsky <bo...@lightbend.com>.
More questions
In Scala my DataProcessor is defined as
class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, Double] with CheckpointedFunction {
And it is used as follows
val models = modelsStream.map(ModelToServe.fromByteArray(_))
  .flatMap(BadDataHandler[ModelToServe])
  .keyBy(_.dataType)
val data = dataStream.map(DataRecord.fromByteArray(_))
  .flatMap(BadDataHandler[WineRecord])
  .keyBy(_.dataType)

// Merge streams
data
  .connect(models)
  .process(DataProcessorKeyed())
When I am doing the same thing in Java
public class DataProcessorKeyed extends CoProcessFunction<Winerecord.WineRecord, ModelToServe, Double> implements CheckpointedFunction{
Which I am using as follows
// Read data from streams
DataStream<Tuple2<String, ModelToServe>> models = modelsStream
        .flatMap(new ModelDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class)))
        .keyBy(0);
DataStream<Tuple2<String, Winerecord.WineRecord>> data = dataStream
        .flatMap(new DataDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(Winerecord.WineRecord.class)))
        .keyBy(0);

// Merge streams
data
        .connect(models)
        .process(new DataProcessorKeyed());
I am getting an error

Error:(68, 17) java: no suitable method found for keyBy(int)
    method org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.collection.Seq<java.lang.Object>) is not applicable
      (argument mismatch; int cannot be converted to scala.collection.Seq<java.lang.Object>)
    method org.apache.flink.streaming.api.scala.DataStream.<K>keyBy(scala.Function1<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,com.lightbend.model.ModelToServe>,K>,org.apache.flink.api.common.typeinfo.TypeInformation<K>) is not applicable
      (cannot infer type-variable(s) K
        (actual and formal argument lists differ in length))
So it assumes key/value pairs for the coprocessor

Why is such difference between APIs?

Boris Lublinsky
FDP Architect
boris.lublinsky@lightbend.com
https://www.lightbend.com/

> On Jan 10, 2018, at 6:20 PM, Boris Lublinsky <bo...@lightbend.com> wrote:
> 
> I am trying to covert Scala code (which works fine) to Java
> The sacral code is:
> // create a Kafka consumers
> // Data
> val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]](
>   DATA_TOPIC,
>   new ByteArraySchema,
>   dataKafkaProps
> )
> 
> // Model
> val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]](
>   MODELS_TOPIC,
>   new ByteArraySchema,
>   modelKafkaProps
> )
> 
> // Create input data streams
> val modelsStream = env.addSource(modelConsumer)
> val dataStream = env.addSource(dataConsumer)
> 
> // Read data from streams
> val models = modelsStream.map(ModelToServe.fromByteArray(_))
>   .flatMap(BadDataHandler[ModelToServe])
>   .keyBy(_.dataType)
> val data = dataStream.map(DataRecord.fromByteArray(_))
>   .flatMap(BadDataHandler[WineRecord])
>   .keyBy(_.dataType)
> Now I am trying to re write it to Java and fighting with the requirement of providing types, where they should be obvious
> 
> // create a Kafka consumers
> // Data
> FlinkKafkaConsumer010<byte[]> dataConsumer = new FlinkKafkaConsumer010<>(
>         ModelServingConfiguration.DATA_TOPIC,
>         new ByteArraySchema(),
>         dataKafkaProps);
> 
> // Model
> FlinkKafkaConsumer010<byte[]>  modelConsumer = new FlinkKafkaConsumer010<>(
>         ModelServingConfiguration.MODELS_TOPIC,
>         new ByteArraySchema(),
>         modelKafkaProps);
> 
> // Create input data streams
> DataStream<byte[]> modelsStream = env.addSource(modelConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
> DataStream<byte[]> dataStream = env.addSource(dataConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
> // Read data from streams
> DataStream<Tuple2<String,ModelToServe>> models = modelsStream
>      .flatMap(new ModelConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class)));
> 
> Am I missing something similar to import org.apache.flink.api.scala._
>  In java?
> 
> Now if this is an only way, Does this seems right?
> 
> Boris Lublinsky
> FDP Architect
> boris.lublinsky@lightbend.com <ma...@lightbend.com>
> https://www.lightbend.com/
>