You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Amit Jain <aj...@gmail.com> on 2018/01/03 10:41:41 UTC

Invalid lambda deserialization

Hi,

I'm writing a job to merge old data with changelogs using DataSet API where
I'm reading changelog using TextInputFormat and old data using
HadoopInputFormat.

I can see, job manager has successfully deployed the program flow to worker
nodes. However, workers are immediately going to failed state because of
*Caused by: java.lang.IllegalArgumentException: Invalid lambda
deserialization*

Complete stack trace
java.lang.RuntimeException: The initialization of the DataSource's outputs
caused an error: Could not read the user code wrapper: unexpected exception
type
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:94)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.runtime.operators.util.CorruptConfigurationException:
Could not read the user code wrapper: unexpected exception type
at
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
at
org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1432)
at
org.apache.flink.runtime.operators.chaining.ChainedMapDriver.setup(ChainedMapDriver.java:39)
at
org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
at
org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1299)
at
org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:287)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:91)
... 2 more
Caused by: java.io.IOException: unexpected exception type
at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1682)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1254)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
at
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
... 8 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248)
... 18 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda
deserialization
at
org.myorg.quickstart.MergeTableSecond.$deserializeLambda$(MergeTableSecond.java:41)
... 28 more


Running Environment
Flink: 1.3.2
Java: openjdk version "1.8.0_151"

Please help us resolve this issue.


--
Thanks,
Amit

Re: Invalid lambda deserialization

Posted by Till Rohrmann <tr...@apache.org>.
Hi Amit,

could this be related [1]? How do you build your job?

[1] https://bugs.eclipse.org/bugs/show_bug.cgi?id=439889

Cheers,
Till

On Wed, Jan 3, 2018 at 2:55 PM, Timo Walther <tw...@apache.org> wrote:

> Hi Amit,
>
> which of the two lambdas caused the error? I guess it was the mapper after
> the parquet input, right? In both cases this should not happen. Maybe you
> can open an issue with a small reproducible code example?
>
> Thanks.
>
> Regards,
> Timo
>
>
> Am 1/3/18 um 12:15 PM schrieb Amit Jain:
>
> Hi Timo,
>>
>> Thanks a lot! Quick re-look over the code helped me to detect used
>> lambdas.
>> I was using lambdas in two cases which are following.
>>
>> DataSet<GenericRecord> newMainDataSet = mainDataSet
>>
>>      .fullOuterJoin(latestDeltaDataSet, JoinHint.REPARTITION_HASH_SECOND)
>>      .where(keySelector).equalTo(keySelector)
>>
>> *    .with((first, second) -> first != null && second != null ? second
>> : (first != null ? first : second))*    .filter(filterFunction)
>>      .returns(GenericRecord.class);
>>
>> DataSet<GenericRecord> mainDataSet =
>>
>>      mergeTableSecond.readParquet(mainPath, avroSchema, env)
>>          .withParameters(parameters)
>> *        .map(**t -> t.f1*
>> *)*        .returns(GenericRecord.class);
>>
>>
>>
>> On Wed, Jan 3, 2018 at 4:18 PM, Timo Walther <tw...@apache.org> wrote:
>>
>> Hi Amit,
>>>
>>> are you using lambdas as parameters of a Flink Function or in a member
>>> variable? If yes, can you share an lambda example that fails?
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 1/3/18 um 11:41 AM schrieb Amit Jain:
>>>
>>> Hi,
>>>>
>>>> I'm writing a job to merge old data with changelogs using DataSet API
>>>> where
>>>> I'm reading changelog using TextInputFormat and old data using
>>>> HadoopInputFormat.
>>>>
>>>> I can see, job manager has successfully deployed the program flow to
>>>> worker
>>>> nodes. However, workers are immediately going to failed state because of
>>>> *Caused by: java.lang.IllegalArgumentException: Invalid lambda
>>>> deserialization*
>>>>
>>>>
>>>> Complete stack trace
>>>> java.lang.RuntimeException: The initialization of the DataSource's
>>>> outputs
>>>> caused an error: Could not read the user code wrapper: unexpected
>>>> exception
>>>> type
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(
>>>> DataSourceTask.java:94)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by:
>>>> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
>>>> Could not read the user code wrapper: unexpected exception type
>>>> at
>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWr
>>>> apper(TaskConfig.java:290)
>>>> at
>>>> org.apache.flink.runtime.operators.BatchTask.instantiateUser
>>>> Code(BatchTask.java:1432)
>>>> at
>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver
>>>> .setup(ChainedMapDriver.java:39)
>>>> at
>>>> org.apache.flink.runtime.operators.chaining.ChainedDriver.
>>>> setup(ChainedDriver.java:90)
>>>> at
>>>> org.apache.flink.runtime.operators.BatchTask.initOutputs(
>>>> BatchTask.java:1299)
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.initOutput
>>>> s(DataSourceTask.java:287)
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(
>>>> DataSourceTask.java:91)
>>>> ... 2 more
>>>> Caused by: java.io.IOException: unexpected exception type
>>>> at java.io.ObjectStreamClass.throwMiscException(ObjectStreamCla
>>>> ss.java:1682)
>>>> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas
>>>> s.java:1254)
>>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>>>> am.java:2078)
>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea
>>>> m.java:2287)
>>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.
>>>> java:2211)
>>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>>>> am.java:2069)
>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(In
>>>> stantiationUtil.java:290)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig
>>>> (InstantiationUtil.java:248)
>>>> at
>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWr
>>>> apper(TaskConfig.java:288)
>>>> ... 8 more
>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>> ssorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>> thodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at java.lang.invoke.SerializedLambda.readResolve(SerializedLamb
>>>> da.java:230)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>> ssorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>> thodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas
>>>> s.java:1248)
>>>> ... 18 more
>>>> Caused by: java.lang.IllegalArgumentException: Invalid lambda
>>>> deserialization
>>>> at
>>>> org.myorg.quickstart.MergeTableSecond.$deserializeLambda$(Me
>>>> rgeTableSecond.java:41)
>>>> ... 28 more
>>>>
>>>>
>>>> Running Environment
>>>> Flink: 1.3.2
>>>> Java: openjdk version "1.8.0_151"
>>>>
>>>> Please help us resolve this issue.
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Amit
>>>>
>>>>
>>>>
>

Re: Invalid lambda deserialization

Posted by Timo Walther <tw...@apache.org>.
Hi Amit,

which of the two lambdas caused the error? I guess it was the mapper 
after the parquet input, right? In both cases this should not happen. 
Maybe you can open an issue with a small reproducible code example?

Thanks.

Regards,
Timo


Am 1/3/18 um 12:15 PM schrieb Amit Jain:
> Hi Timo,
>
> Thanks a lot! Quick re-look over the code helped me to detect used lambdas.
> I was using lambdas in two cases which are following.
>
> DataSet<GenericRecord> newMainDataSet = mainDataSet
>
>      .fullOuterJoin(latestDeltaDataSet, JoinHint.REPARTITION_HASH_SECOND)
>      .where(keySelector).equalTo(keySelector)
>
> *    .with((first, second) -> first != null && second != null ? second
> : (first != null ? first : second))*    .filter(filterFunction)
>      .returns(GenericRecord.class);
>
> DataSet<GenericRecord> mainDataSet =
>
>      mergeTableSecond.readParquet(mainPath, avroSchema, env)
>          .withParameters(parameters)
> *        .map(**t -> t.f1*
> *)*        .returns(GenericRecord.class);
>
>
>
> On Wed, Jan 3, 2018 at 4:18 PM, Timo Walther <tw...@apache.org> wrote:
>
>> Hi Amit,
>>
>> are you using lambdas as parameters of a Flink Function or in a member
>> variable? If yes, can you share an lambda example that fails?
>>
>> Regards,
>> Timo
>>
>>
>> Am 1/3/18 um 11:41 AM schrieb Amit Jain:
>>
>>> Hi,
>>>
>>> I'm writing a job to merge old data with changelogs using DataSet API
>>> where
>>> I'm reading changelog using TextInputFormat and old data using
>>> HadoopInputFormat.
>>>
>>> I can see, job manager has successfully deployed the program flow to
>>> worker
>>> nodes. However, workers are immediately going to failed state because of
>>> *Caused by: java.lang.IllegalArgumentException: Invalid lambda
>>> deserialization*
>>>
>>>
>>> Complete stack trace
>>> java.lang.RuntimeException: The initialization of the DataSource's outputs
>>> caused an error: Could not read the user code wrapper: unexpected
>>> exception
>>> type
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(
>>> DataSourceTask.java:94)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by:
>>> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
>>> Could not read the user code wrapper: unexpected exception type
>>> at
>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWr
>>> apper(TaskConfig.java:290)
>>> at
>>> org.apache.flink.runtime.operators.BatchTask.instantiateUser
>>> Code(BatchTask.java:1432)
>>> at
>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver
>>> .setup(ChainedMapDriver.java:39)
>>> at
>>> org.apache.flink.runtime.operators.chaining.ChainedDriver.
>>> setup(ChainedDriver.java:90)
>>> at
>>> org.apache.flink.runtime.operators.BatchTask.initOutputs(
>>> BatchTask.java:1299)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.initOutput
>>> s(DataSourceTask.java:287)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(
>>> DataSourceTask.java:91)
>>> ... 2 more
>>> Caused by: java.io.IOException: unexpected exception type
>>> at java.io.ObjectStreamClass.throwMiscException(ObjectStreamCla
>>> ss.java:1682)
>>> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas
>>> s.java:1254)
>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>>> am.java:2078)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea
>>> m.java:2287)
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>>> am.java:2069)
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433)
>>> at
>>> org.apache.flink.util.InstantiationUtil.deserializeObject(In
>>> stantiationUtil.java:290)
>>> at
>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig
>>> (InstantiationUtil.java:248)
>>> at
>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWr
>>> apper(TaskConfig.java:288)
>>> ... 8 more
>>> Caused by: java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at java.lang.invoke.SerializedLambda.readResolve(SerializedLamb
>>> da.java:230)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas
>>> s.java:1248)
>>> ... 18 more
>>> Caused by: java.lang.IllegalArgumentException: Invalid lambda
>>> deserialization
>>> at
>>> org.myorg.quickstart.MergeTableSecond.$deserializeLambda$(Me
>>> rgeTableSecond.java:41)
>>> ... 28 more
>>>
>>>
>>> Running Environment
>>> Flink: 1.3.2
>>> Java: openjdk version "1.8.0_151"
>>>
>>> Please help us resolve this issue.
>>>
>>>
>>> --
>>> Thanks,
>>> Amit
>>>
>>>


Re: Invalid lambda deserialization

Posted by Amit Jain <aj...@gmail.com>.
Hi Timo,

Thanks a lot! Quick re-look over the code helped me to detect used lambdas.
I was using lambdas in two cases which are following.

DataSet<GenericRecord> newMainDataSet = mainDataSet

    .fullOuterJoin(latestDeltaDataSet, JoinHint.REPARTITION_HASH_SECOND)
    .where(keySelector).equalTo(keySelector)

*    .with((first, second) -> first != null && second != null ? second
: (first != null ? first : second))*    .filter(filterFunction)
    .returns(GenericRecord.class);

DataSet<GenericRecord> mainDataSet =

    mergeTableSecond.readParquet(mainPath, avroSchema, env)
        .withParameters(parameters)
*        .map(**t -> t.f1*
*)*        .returns(GenericRecord.class);



On Wed, Jan 3, 2018 at 4:18 PM, Timo Walther <tw...@apache.org> wrote:

> Hi Amit,
>
> are you using lambdas as parameters of a Flink Function or in a member
> variable? If yes, can you share an lambda example that fails?
>
> Regards,
> Timo
>
>
> Am 1/3/18 um 11:41 AM schrieb Amit Jain:
>
>> Hi,
>>
>> I'm writing a job to merge old data with changelogs using DataSet API
>> where
>> I'm reading changelog using TextInputFormat and old data using
>> HadoopInputFormat.
>>
>> I can see, job manager has successfully deployed the program flow to
>> worker
>> nodes. However, workers are immediately going to failed state because of
>> *Caused by: java.lang.IllegalArgumentException: Invalid lambda
>> deserialization*
>>
>>
>> Complete stack trace
>> java.lang.RuntimeException: The initialization of the DataSource's outputs
>> caused an error: Could not read the user code wrapper: unexpected
>> exception
>> type
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(
>> DataSourceTask.java:94)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
>> Could not read the user code wrapper: unexpected exception type
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWr
>> apper(TaskConfig.java:290)
>> at
>> org.apache.flink.runtime.operators.BatchTask.instantiateUser
>> Code(BatchTask.java:1432)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver
>> .setup(ChainedMapDriver.java:39)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedDriver.
>> setup(ChainedDriver.java:90)
>> at
>> org.apache.flink.runtime.operators.BatchTask.initOutputs(
>> BatchTask.java:1299)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.initOutput
>> s(DataSourceTask.java:287)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(
>> DataSourceTask.java:91)
>> ... 2 more
>> Caused by: java.io.IOException: unexpected exception type
>> at java.io.ObjectStreamClass.throwMiscException(ObjectStreamCla
>> ss.java:1682)
>> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas
>> s.java:1254)
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>> am.java:2078)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea
>> m.java:2287)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>> am.java:2069)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(In
>> stantiationUtil.java:290)
>> at
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig
>> (InstantiationUtil.java:248)
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWr
>> apper(TaskConfig.java:288)
>> ... 8 more
>> Caused by: java.lang.reflect.InvocationTargetException
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at java.lang.invoke.SerializedLambda.readResolve(SerializedLamb
>> da.java:230)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas
>> s.java:1248)
>> ... 18 more
>> Caused by: java.lang.IllegalArgumentException: Invalid lambda
>> deserialization
>> at
>> org.myorg.quickstart.MergeTableSecond.$deserializeLambda$(Me
>> rgeTableSecond.java:41)
>> ... 28 more
>>
>>
>> Running Environment
>> Flink: 1.3.2
>> Java: openjdk version "1.8.0_151"
>>
>> Please help us resolve this issue.
>>
>>
>> --
>> Thanks,
>> Amit
>>
>>
>

Re: Invalid lambda deserialization

Posted by Timo Walther <tw...@apache.org>.
Hi Amit,

are you using lambdas as parameters of a Flink Function or in a member 
variable? If yes, can you share an lambda example that fails?

Regards,
Timo


Am 1/3/18 um 11:41 AM schrieb Amit Jain:
> Hi,
>
> I'm writing a job to merge old data with changelogs using DataSet API where
> I'm reading changelog using TextInputFormat and old data using
> HadoopInputFormat.
>
> I can see, job manager has successfully deployed the program flow to worker
> nodes. However, workers are immediately going to failed state because of
> *Caused by: java.lang.IllegalArgumentException: Invalid lambda
> deserialization*
>
> Complete stack trace
> java.lang.RuntimeException: The initialization of the DataSource's outputs
> caused an error: Could not read the user code wrapper: unexpected exception
> type
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:94)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> Could not read the user code wrapper: unexpected exception type
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
> at
> org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1432)
> at
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.setup(ChainedMapDriver.java:39)
> at
> org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
> at
> org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1299)
> at
> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:287)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:91)
> ... 2 more
> Caused by: java.io.IOException: unexpected exception type
> at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1682)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1254)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
> at
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
> ... 8 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248)
> ... 18 more
> Caused by: java.lang.IllegalArgumentException: Invalid lambda
> deserialization
> at
> org.myorg.quickstart.MergeTableSecond.$deserializeLambda$(MergeTableSecond.java:41)
> ... 28 more
>
>
> Running Environment
> Flink: 1.3.2
> Java: openjdk version "1.8.0_151"
>
> Please help us resolve this issue.
>
>
> --
> Thanks,
> Amit
>