You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Eugene Kirpichov (JIRA)" <ji...@apache.org> on 2018/07/09 17:50:00 UTC

[jira] [Created] (BEAM-4745) SDF tests broken by innocent change due to Dataflow worker dependencies

Eugene Kirpichov created BEAM-4745:
--------------------------------------

             Summary: SDF tests broken by innocent change due to Dataflow worker dependencies
                 Key: BEAM-4745
                 URL: https://issues.apache.org/jira/browse/BEAM-4745
             Project: Beam
          Issue Type: Bug
          Components: runner-dataflow
            Reporter: Eugene Kirpichov
            Assignee: Eugene Kirpichov


https://github.com/apache/beam/pull/5894 broke SDF in Dataflow streaming runner, using SDFs fails with the error below.

The reason is that Dataflow worker has a staged copy of some stuff including runners-core-construction, and it comes before user code in the classpath. So the pipeline includes a serialized SplittableParDo from master, but the worker deserializes it using a stale class file.

This needs to be fixed on Dataflow side. Filing this JIRA just to track the externally facing issue.

Meanwhile to stop the bleeding I'm going to revert the change, even though by itself it's a correct change, but it's better to have SDFs not invoke setup/teardown than to have them not work at all.

CC: [~iemejia]

java.lang.RuntimeException: com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:192)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163)
        com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
        com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
        com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
        com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90)
        com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:84)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:181)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163)
        com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
        com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
        com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: unable to deserialize Serialized DoFnInfo
        org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
        com.google.cloud.dataflow.worker.UserParDoFnFactory$UserDoFnExtractor.getDoFnInfo(UserParDoFnFactory.java:61)
        com.google.cloud.dataflow.worker.UserParDoFnFactory.lambda$create$0(UserParDoFnFactory.java:92)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
        com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90)
        com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:84)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:181)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163)
        com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
        com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
        com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: org.apache.beam.runners.core.construction.SplittableParDo$PairWithRestrictionFn; local class incompatible: stream classdesc serialVersionUID = -2216501394657530686, local class serialVersionUID = -6277163835950193211
        java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
        java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630)
        java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
        java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
        com.google.cloud.dataflow.worker.UserParDoFnFactory$UserDoFnExtractor.getDoFnInfo(UserParDoFnFactory.java:61)
        com.google.cloud.dataflow.worker.UserParDoFnFactory.lambda$create$0(UserParDoFnFactory.java:92)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4904)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
        com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4899)
        com.google.cloud.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:90)
        com.google.cloud.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:74)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:262)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:84)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:181)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:163)
        com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
        com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
        com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
        com.google.cloud.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:123)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1143)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:136)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)