You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by James Yu <cy...@gmail.com> on 2018/03/23 00:06:51 UTC

Example PopularPlacesFromKafka fails to run

Hi,

I fail to run the PopularPlacesFromKafka example with the following
exception, and I wonder what might cause this "Invalid record" error?

when running within Intellij IDEA -->
07:52:23.960 [Source: Custom Source -> Map (7/8)] INFO
 org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> Map
(7/8) (930e95aac65cbda39d9f1eaa41891253) switched from RUNNING to FAILED.
java.lang.RuntimeException: Invalid record:
4010,2013003778,2013003775,START,2013-01-01 00:13:00,1970-01-01
00:00:00,-74.00074,40.7359,-73.98559,40.739063,1
at
com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide.fromString(TaxiRide.java:119)
~[flink-training-exercises-0.15.1.jar:na]
at
com.dataartisans.flinktraining.exercises.datastream_java.utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:37)
~[flink-training-exercises-0.15.1.jar:na]
at
com.dataartisans.flinktraining.exercises.datastream_java.utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:28)
~[flink-training-exercises-0.15.1.jar:na]
at
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42)
~[flink-training-exercises-0.15.1.jar:na]
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:139)
~[flink-training-exercises-0.15.1.jar:na]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:652)
~[flink-training-exercises-0.15.1.jar:na]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
~[flink-runtime_2.11-1.4.2.jar:1.4.2]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]

when deploy to and run on local cluster -->
2018-03-23 07:27:23.130 [Source: Custom Source -> Map (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> Map
(1/1) (db21c7604b94968097d4be7b8558ac08) switched from RUNNING to FAILED.
java.lang.RuntimeException: Invalid record:
2264,2013002216,2013002213,START,2013-01-01 00:09:00,1970-01-01
00:00:00,-74.00402,40.742107,-73.98032,40.73522,1
at
com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide.fromString(TaxiRide.java:119)
~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f66178:na]
at
com.dataartisans.flinktraining.exercises.datastream_java.utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:37)
~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f66178:na]
at
com.dataartisans.flinktraining.exercises.datastream_java.utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:28)
~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f66178:na]
at
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42)
~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f66178:na]
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:139)
~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f66178:na]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:652)
~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f66178:na]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
~[flink-dist_2.11-1.4.2.jar:1.4.2]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
~[flink-dist_2.11-1.4.2.jar:1.4.2]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
~[flink-dist_2.11-1.4.2.jar:1.4.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
~[flink-dist_2.11-1.4.2.jar:1.4.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
~[flink-dist_2.11-1.4.2.jar:1.4.2]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]

I copied the PopularPlacesFromKafka.java from
https://raw.githubusercontent.com/dataArtisans/flink-training-exercises/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/connectors/PopularPlacesFromKafka.java


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275

Re: Example PopularPlacesFromKafka fails to run

Posted by Fabian Hueske <fh...@gmail.com>.
Thanks for the feedback!

Fabian

2018-03-23 8:02 GMT+01:00 James Yu <cy...@gmail.com>:

> Hi,
>
> When I proceed further to timePrediction exercise (http://training.data-
> artisans.com/exercises/timePrediction.html), I realize that the nycTaxiRides.gz's
> format is fine.
> The problem is in TaxiRide.toString(), the columns were serialized in
> wrong order. Hence the data persisted in Kafka has wrong format.
> Therefore I change TaxiRide.toString() to the following:
>
>   public String toString() {
>     StringBuilder sb = new StringBuilder();
>     sb.append(rideId).append(",");
>     sb.append(isStart ? "START" : "END").append(",");
>     sb.append(startTime.toString(timeFormatter)).append(",");
>     sb.append(endTime.toString(timeFormatter)).append(",");
>     sb.append(startLon).append(",");
>     sb.append(startLat).append(",");
>     sb.append(endLon).append(",");
>     sb.append(endLat).append(",");
>     sb.append(passengerCnt).append(",");
>     sb.append(taxiId).append(",");
>     sb.append(driverId);
>
>     return sb.toString();
>   }
>
>
>
> This is a UTF-8 formatted mail
> -----------------------------------------------
> James C.-C.Yu
> +886988713275 <+886%20988%20713%20275>
>
> 2018-03-23 9:59 GMT+08:00 James Yu <cy...@gmail.com>:
>
>> Just figured out the data format in nycTaxiRides.gz doesn't match to the
>> way TaxiRide.java interpreting the lines fed into it.
>> Then I check the exercise training github and found the TaxiRide.java (
>> https://github.com/dataArtisans/flink-training-exercises/
>> tree/master/src/main/java/com/dataartisans/flinktraining/
>> exercises/datastream_java/datatypes) was recently updated (like 11 days
>> ago).
>> After making some changes to TaxiRide.java, the example works like a
>> charm.
>>
>> I got the nycTaxiRides.gz by issuing this line in console:
>> wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz
>>
>> Following is the changes I made to TaxiRide.java locally (basically just
>> the index to variable tokens):
>> try {
>> ride.rideId = Long.parseLong(tokens[0]);
>>
>> switch (tokens[3]) {
>> case "START":
>> ride.isStart = true;
>> ride.startTime = DateTime.parse(tokens[4], timeFormatter);
>> ride.endTime = DateTime.parse(tokens[5], timeFormatter);
>> break;
>> case "END":
>> ride.isStart = false;
>> ride.endTime = DateTime.parse(tokens[4], timeFormatter);
>> ride.startTime = DateTime.parse(tokens[5], timeFormatter);
>> break;
>> default:
>> throw new RuntimeException("Invalid record: " + line);
>> }
>>
>> ride.startLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) :
>> 0.0f;
>> ride.startLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) :
>> 0.0f;
>> ride.endLon = tokens[8].length() > 0 ? Float.parseFloat(tokens[8]) : 0.0f;
>> ride.endLat = tokens[9].length() > 0 ? Float.parseFloat(tokens[9]) : 0.0f;
>> ride.passengerCnt = Short.parseShort(tokens[10]);
>> ride.taxiId = Long.parseLong(tokens[1]);
>> ride.driverId = Long.parseLong(tokens[2]);
>>
>> } catch (NumberFormatException nfe) {
>> throw new RuntimeException("Invalid record: " + line, nfe);
>> }
>>
>>
>> This is a UTF-8 formatted mail
>> -----------------------------------------------
>> James C.-C.Yu
>> +886988713275 <+886%20988%20713%20275>
>>
>> 2018-03-23 8:06 GMT+08:00 James Yu <cy...@gmail.com>:
>>
>>> Hi,
>>>
>>> I fail to run the PopularPlacesFromKafka example with the following
>>> exception, and I wonder what might cause this "Invalid record" error?
>>>
>>> when running within Intellij IDEA -->
>>> 07:52:23.960 [Source: Custom Source -> Map (7/8)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source ->
>>> Map (7/8) (930e95aac65cbda39d9f1eaa41891253) switched from RUNNING to
>>> FAILED.
>>> java.lang.RuntimeException: Invalid record: 4010,2013003778
>>> <(201)%20300-3778>,2013003775 <(201)%20300-3775>,START,2013-01-01
>>> 00:13:00,1970-01-01 00:00:00,-74.00074,40.7359,-73.98559,40.739063,1
>>> at com.dataartisans.flinktraining.exercises.datastream_java.dat
>>> atypes.TaxiRide.fromString(TaxiRide.java:119)
>>> ~[flink-training-exercises-0.15.1.jar:na]
>>> at com.dataartisans.flinktraining.exercises.datastream_java.uti
>>> ls.TaxiRideSchema.deserialize(TaxiRideSchema.java:37)
>>> ~[flink-training-exercises-0.15.1.jar:na]
>>> at com.dataartisans.flinktraining.exercises.datastream_java.uti
>>> ls.TaxiRideSchema.deserialize(TaxiRideSchema.java:28)
>>> ~[flink-training-exercises-0.15.1.jar:na]
>>> at org.apache.flink.streaming.util.serialization.KeyedDeseriali
>>> zationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42)
>>> ~[flink-training-exercises-0.15.1.jar:na]
>>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09
>>> Fetcher.runFetchLoop(Kafka09Fetcher.java:139)
>>> ~[flink-training-exercises-0.15.1.jar:na]
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsum
>>> erBase.run(FlinkKafkaConsumerBase.java:652)
>>> ~[flink-training-exercises-0.15.1.jar:na]
>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>>> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>>> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>>> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> ~[flink-runtime_2.11-1.4.2.jar:1.4.2]
>>> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
>>>
>>> when deploy to and run on local cluster -->
>>> 2018-03-23 07:27:23.130 [Source: Custom Source -> Map (1/1)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source ->
>>> Map (1/1) (db21c7604b94968097d4be7b8558ac08) switched from RUNNING to
>>> FAILED.
>>> java.lang.RuntimeException: Invalid record: 2264,2013002216
>>> <(201)%20300-2216>,2013002213 <(201)%20300-2213>,START,2013-01-01
>>> 00:09:00,1970-01-01 00:00:00,-74.00402,40.742107,-73.98032,40.73522,1
>>> at com.dataartisans.flinktraining.exercises.datastream_java.dat
>>> atypes.TaxiRide.fromString(TaxiRide.java:119)
>>> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f
>>> aa39e6b27477bb80f66178:na]
>>> at com.dataartisans.flinktraining.exercises.datastream_java.uti
>>> ls.TaxiRideSchema.deserialize(TaxiRideSchema.java:37)
>>> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f
>>> aa39e6b27477bb80f66178:na]
>>> at com.dataartisans.flinktraining.exercises.datastream_java.uti
>>> ls.TaxiRideSchema.deserialize(TaxiRideSchema.java:28)
>>> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f
>>> aa39e6b27477bb80f66178:na]
>>> at org.apache.flink.streaming.util.serialization.KeyedDeseriali
>>> zationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42)
>>> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f
>>> aa39e6b27477bb80f66178:na]
>>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09
>>> Fetcher.runFetchLoop(Kafka09Fetcher.java:139)
>>> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f
>>> aa39e6b27477bb80f66178:na]
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsum
>>> erBase.run(FlinkKafkaConsumerBase.java:652)
>>> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f
>>> aa39e6b27477bb80f66178:na]
>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>>> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>>> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>>> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>>> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
>>>
>>> I copied the PopularPlacesFromKafka.java from
>>> https://raw.githubusercontent.com/dataArtisans/flink-trainin
>>> g-exercises/master/src/main/java/com/dataartisans/flinktrain
>>> ing/exercises/datastream_java/connectors/PopularPlacesFromKafka.java
>>>
>>>
>>> This is a UTF-8 formatted mail
>>> -----------------------------------------------
>>> James C.-C.Yu
>>> +886988713275 <+886%20988%20713%20275>
>>>
>>
>>
>

Re: Example PopularPlacesFromKafka fails to run

Posted by James Yu <cy...@gmail.com>.
Hi,

When I proceed further to timePrediction exercise (
http://training.data-artisans.com/exercises/timePrediction.html), I realize
that the nycTaxiRides.gz's format is fine.
The problem is in TaxiRide.toString(), the columns were serialized in wrong
order. Hence the data persisted in Kafka has wrong format.
Therefore I change TaxiRide.toString() to the following:

  public String toString() {
    StringBuilder sb = new StringBuilder();
    sb.append(rideId).append(",");
    sb.append(isStart ? "START" : "END").append(",");
    sb.append(startTime.toString(timeFormatter)).append(",");
    sb.append(endTime.toString(timeFormatter)).append(",");
    sb.append(startLon).append(",");
    sb.append(startLat).append(",");
    sb.append(endLon).append(",");
    sb.append(endLat).append(",");
    sb.append(passengerCnt).append(",");
    sb.append(taxiId).append(",");
    sb.append(driverId);

    return sb.toString();
  }



This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275

2018-03-23 9:59 GMT+08:00 James Yu <cy...@gmail.com>:

> Just figured out the data format in nycTaxiRides.gz doesn't match to the
> way TaxiRide.java interpreting the lines fed into it.
> Then I check the exercise training github and found the TaxiRide.java (
> https://github.com/dataArtisans/flink-training-exercises/tree/master/src/
> main/java/com/dataartisans/flinktraining/exercises/
> datastream_java/datatypes) was recently updated (like 11 days ago).
> After making some changes to TaxiRide.java, the example works like a charm.
>
> I got the nycTaxiRides.gz by issuing this line in console:
> wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz
>
> Following is the changes I made to TaxiRide.java locally (basically just
> the index to variable tokens):
> try {
> ride.rideId = Long.parseLong(tokens[0]);
>
> switch (tokens[3]) {
> case "START":
> ride.isStart = true;
> ride.startTime = DateTime.parse(tokens[4], timeFormatter);
> ride.endTime = DateTime.parse(tokens[5], timeFormatter);
> break;
> case "END":
> ride.isStart = false;
> ride.endTime = DateTime.parse(tokens[4], timeFormatter);
> ride.startTime = DateTime.parse(tokens[5], timeFormatter);
> break;
> default:
> throw new RuntimeException("Invalid record: " + line);
> }
>
> ride.startLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) :
> 0.0f;
> ride.startLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) :
> 0.0f;
> ride.endLon = tokens[8].length() > 0 ? Float.parseFloat(tokens[8]) : 0.0f;
> ride.endLat = tokens[9].length() > 0 ? Float.parseFloat(tokens[9]) : 0.0f;
> ride.passengerCnt = Short.parseShort(tokens[10]);
> ride.taxiId = Long.parseLong(tokens[1]);
> ride.driverId = Long.parseLong(tokens[2]);
>
> } catch (NumberFormatException nfe) {
> throw new RuntimeException("Invalid record: " + line, nfe);
> }
>
>
> This is a UTF-8 formatted mail
> -----------------------------------------------
> James C.-C.Yu
> +886988713275 <+886%20988%20713%20275>
>
> 2018-03-23 8:06 GMT+08:00 James Yu <cy...@gmail.com>:
>
>> Hi,
>>
>> I fail to run the PopularPlacesFromKafka example with the following
>> exception, and I wonder what might cause this "Invalid record" error?
>>
>> when running within Intellij IDEA -->
>> 07:52:23.960 [Source: Custom Source -> Map (7/8)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source ->
>> Map (7/8) (930e95aac65cbda39d9f1eaa41891253) switched from RUNNING to
>> FAILED.
>> java.lang.RuntimeException: Invalid record: 4010,2013003778
>> <(201)%20300-3778>,2013003775 <(201)%20300-3775>,START,2013-01-01
>> 00:13:00,1970-01-01 00:00:00,-74.00074,40.7359,-73.98559,40.739063,1
>> at com.dataartisans.flinktraining.exercises.datastream_java.
>> datatypes.TaxiRide.fromString(TaxiRide.java:119)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at com.dataartisans.flinktraining.exercises.datastream_java.
>> utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:37)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at com.dataartisans.flinktraining.exercises.datastream_java.
>> utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:28)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at org.apache.flink.streaming.util.serialization.KeyedDeseriali
>> zationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09
>> Fetcher.runFetchLoop(Kafka09Fetcher.java:139)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsum
>> erBase.run(FlinkKafkaConsumerBase.java:652)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> ~[flink-runtime_2.11-1.4.2.jar:1.4.2]
>> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
>>
>> when deploy to and run on local cluster -->
>> 2018-03-23 07:27:23.130 [Source: Custom Source -> Map (1/1)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source ->
>> Map (1/1) (db21c7604b94968097d4be7b8558ac08) switched from RUNNING to
>> FAILED.
>> java.lang.RuntimeException: Invalid record: 2264,2013002216
>> <(201)%20300-2216>,2013002213 <(201)%20300-2213>,START,2013-01-01
>> 00:09:00,1970-01-01 00:00:00,-74.00402,40.742107,-73.98032,40.73522,1
>> at com.dataartisans.flinktraining.exercises.datastream_java.
>> datatypes.TaxiRide.fromString(TaxiRide.java:119)
>> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f
>> aa39e6b27477bb80f66178:na]
>> at com.dataartisans.flinktraining.exercises.datastream_java.
>> utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:37)
>> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f
>> aa39e6b27477bb80f66178:na]
>> at com.dataartisans.flinktraining.exercises.datastream_java.
>> utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:28)
>> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f
>> aa39e6b27477bb80f66178:na]
>> at org.apache.flink.streaming.util.serialization.KeyedDeseriali
>> zationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42)
>> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f
>> aa39e6b27477bb80f66178:na]
>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09
>> Fetcher.runFetchLoop(Kafka09Fetcher.java:139)
>> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f
>> aa39e6b27477bb80f66178:na]
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsum
>> erBase.run(FlinkKafkaConsumerBase.java:652)
>> ~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479f
>> aa39e6b27477bb80f66178:na]
>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
>> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
>>
>> I copied the PopularPlacesFromKafka.java from
>> https://raw.githubusercontent.com/dataArtisans/flink-trainin
>> g-exercises/master/src/main/java/com/dataartisans/flinktra
>> ining/exercises/datastream_java/connectors/PopularPlacesFromKafka.java
>>
>>
>> This is a UTF-8 formatted mail
>> -----------------------------------------------
>> James C.-C.Yu
>> +886988713275 <+886%20988%20713%20275>
>>
>
>

Re: Example PopularPlacesFromKafka fails to run

Posted by James Yu <cy...@gmail.com>.
Just figured out the data format in nycTaxiRides.gz doesn't match to the
way TaxiRide.java interpreting the lines fed into it.
Then I check the exercise training github and found the TaxiRide.java (
https://github.com/dataArtisans/flink-training-exercises/tree/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/datatypes)
was recently updated (like 11 days ago).
After making some changes to TaxiRide.java, the example works like a charm.

I got the nycTaxiRides.gz by issuing this line in console:
wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz

Following is the changes I made to TaxiRide.java locally (basically just
the index to variable tokens):
try {
ride.rideId = Long.parseLong(tokens[0]);

switch (tokens[3]) {
case "START":
ride.isStart = true;
ride.startTime = DateTime.parse(tokens[4], timeFormatter);
ride.endTime = DateTime.parse(tokens[5], timeFormatter);
break;
case "END":
ride.isStart = false;
ride.endTime = DateTime.parse(tokens[4], timeFormatter);
ride.startTime = DateTime.parse(tokens[5], timeFormatter);
break;
default:
throw new RuntimeException("Invalid record: " + line);
}

ride.startLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) : 0.0f;
ride.startLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) : 0.0f;
ride.endLon = tokens[8].length() > 0 ? Float.parseFloat(tokens[8]) : 0.0f;
ride.endLat = tokens[9].length() > 0 ? Float.parseFloat(tokens[9]) : 0.0f;
ride.passengerCnt = Short.parseShort(tokens[10]);
ride.taxiId = Long.parseLong(tokens[1]);
ride.driverId = Long.parseLong(tokens[2]);

} catch (NumberFormatException nfe) {
throw new RuntimeException("Invalid record: " + line, nfe);
}


This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275

2018-03-23 8:06 GMT+08:00 James Yu <cy...@gmail.com>:

> Hi,
>
> I fail to run the PopularPlacesFromKafka example with the following
> exception, and I wonder what might cause this "Invalid record" error?
>
> when running within Intellij IDEA -->
> 07:52:23.960 [Source: Custom Source -> Map (7/8)] INFO
>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source ->
> Map (7/8) (930e95aac65cbda39d9f1eaa41891253) switched from RUNNING to
> FAILED.
> java.lang.RuntimeException: Invalid record: 4010,2013003778
> <(201)%20300-3778>,2013003775 <(201)%20300-3775>,START,2013-01-01
> 00:13:00,1970-01-01 00:00:00,-74.00074,40.7359,-73.98559,40.739063,1
> at com.dataartisans.flinktraining.exercises.datastream_java.datatypes.
> TaxiRide.fromString(TaxiRide.java:119) ~[flink-training-exercises-0.
> 15.1.jar:na]
> at com.dataartisans.flinktraining.exercises.datastream_java.utils.
> TaxiRideSchema.deserialize(TaxiRideSchema.java:37)
> ~[flink-training-exercises-0.15.1.jar:na]
> at com.dataartisans.flinktraining.exercises.datastream_java.utils.
> TaxiRideSchema.deserialize(TaxiRideSchema.java:28)
> ~[flink-training-exercises-0.15.1.jar:na]
> at org.apache.flink.streaming.util.serialization.
> KeyedDeserializationSchemaWrapper.deserialize(
> KeyedDeserializationSchemaWrapper.java:42) ~[flink-training-exercises-0.
> 15.1.jar:na]
> at org.apache.flink.streaming.connectors.kafka.internal.
> Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:139)
> ~[flink-training-exercises-0.15.1.jar:na]
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(
> FlinkKafkaConsumerBase.java:652) ~[flink-training-exercises-0.15.1.jar:na]
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> ~[flink-runtime_2.11-1.4.2.jar:1.4.2]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
>
> when deploy to and run on local cluster -->
> 2018-03-23 07:27:23.130 [Source: Custom Source -> Map (1/1)] INFO
>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source ->
> Map (1/1) (db21c7604b94968097d4be7b8558ac08) switched from RUNNING to
> FAILED.
> java.lang.RuntimeException: Invalid record: 2264,2013002216
> <(201)%20300-2216>,2013002213 <(201)%20300-2213>,START,2013-01-01
> 00:09:00,1970-01-01 00:00:00,-74.00402,40.742107,-73.98032,40.73522,1
> at com.dataartisans.flinktraining.exercises.datastream_java.datatypes.
> TaxiRide.fromString(TaxiRide.java:119) ~[blob_p-
> bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f661
> 78:na]
> at com.dataartisans.flinktraining.exercises.datastream_java.utils.
> TaxiRideSchema.deserialize(TaxiRideSchema.java:37) ~[blob_p-
> bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f661
> 78:na]
> at com.dataartisans.flinktraining.exercises.datastream_java.utils.
> TaxiRideSchema.deserialize(TaxiRideSchema.java:28) ~[blob_p-
> bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f661
> 78:na]
> at org.apache.flink.streaming.util.serialization.
> KeyedDeserializationSchemaWrapper.deserialize(
> KeyedDeserializationSchemaWrapper.java:42) ~[blob_p-
> bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f661
> 78:na]
> at org.apache.flink.streaming.connectors.kafka.internal.
> Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:139) ~[blob_p-
> bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f661
> 78:na]
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(
> FlinkKafkaConsumerBase.java:652) ~[blob_p-bc9bbda0acf1e77543b16e023cbf24
> 66bbe42a5d-ac2bf6479faa39e6b27477bb80f66178:na]
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> ~[flink-dist_2.11-1.4.2.jar:1.4.2]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
>
> I copied the PopularPlacesFromKafka.java from
> https://raw.githubusercontent.com/dataArtisans/flink-
> training-exercises/master/src/main/java/com/dataartisans/
> flinktraining/exercises/datastream_java/connectors/
> PopularPlacesFromKafka.java
>
>
> This is a UTF-8 formatted mail
> -----------------------------------------------
> James C.-C.Yu
> +886988713275 <+886%20988%20713%20275>
>