You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kyle Hamlin <ha...@gmail.com> on 2018/01/03 19:31:55 UTC

BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

Hello,

After moving to Flink 1.4.0 I'm getting the following error. I can't find
anything online that addresses it. Is it a Hadoop dependency issue? Here
are my project dependencies:

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
  "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
  "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
  "org.apache.flink" % "flink-metrics-core" % flinkVersion,
  "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
  "org.apache.kafka" %% "kafka" % "0.10.0.1",
  "org.apache.avro" % "avro" % "1.7.7",
  "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
  "org.apache.parquet" % "parquet-avro" % "1.8.1",
  "io.confluent" % "kafka-avro-serializer" % "3.2.0",
  "org.apache.hadoop" % "hadoop-common" % "3.0.0"
)

*Stacktrace:*
Cluster configuration: Standalone cluster with JobManager at localhost/
127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for
job completion.
Connected to JobManager at
Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259]
with leader session id 00000000-0000-0000-0000-000000000000.
01/03/2018 14:20:52 Job execution switched to status RUNNING.
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
java.lang.RuntimeException: Error while creating FileSystem when
initializing the state of the BucketingSink.
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI:
hdfs://localhost:12345/
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more
Caused by: java.lang.ClassCastException:
org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to
org.apache.hadoop.ipc.RpcEngine
at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
... 13 more

Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

Posted by Kyle Hamlin <ha...@gmail.com>.
+Aljoscha Krettek <al...@apache.org> I setup my project using the
template you suggested and I'm able to bucket and write files locally. I
also want to test writing to s3 but I don't know how to configure the `sbt
run` command to tell the FlinkMiniCluster to use the
flink-s3-fs-hadoop-1.4.0.jar and a flink-conf.yaml?

When I try running my jar via the `flink run` command I get the same:
"java.lang.RuntimeException: Error while creating FileSystem when
initializing the state of the BucketingSink" error. How do I overcome this
issues while being able to use the `flink run` command, so I'm able to the
flink-conf.yaml and flink-s3-fs-hadoop-1.4.0.jar?


On Fri, Jan 5, 2018 at 7:50 PM Kyle Hamlin <ha...@gmail.com> wrote:

> Also, I'm not using hdfs I'm trying to sink to s3.
>
> On Fri, Jan 5, 2018 at 6:18 PM Kyle Hamlin <ha...@gmail.com> wrote:
>
>> I have the hadoop-common.jar in my build.sbt because I was having issues
>> compiling my jar after moving from 1.3.2 to 1.4.0 because
>> org.apache.hadoop.fs.{FileSystem, Path} were no longer in Flink and I use
>> them in my custom bucketer and to writer to write Avro out to Parquet.
>>
>> I tried adding classloader.resolve-order: parent-first to my
>> flink-conf.yaml but that didn't seem to work. I greped my jar for "hadoop"
>> and found the following:
>>
>> org/apache/hadoop/*
>> org/apache/parquet/hadoop/*
>>
>> after designating hadoop-common.jar dependency as "provided" only org/apache/parquet/hadoop/*
>> files show up. Additionally, the "RpcEngine" and "ProtobufRpcEngine" error
>> doesn't show up anymore just the following:
>>
>> java.lang.RuntimeException: Error while creating FileSystem when
>> initializing the state of the BucketingSink.
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: Cannot instantiate file system for URI:
>> hdfs://localhost:12345/
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>> at
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
>> ... 9 more
>>
>> Moving the hadoop-common.jar to flinks lib/ directory also doesn't appear
>> to help.
>>
>>
>> On Thu, Jan 4, 2018 at 10:48 AM Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> I think this might be happening because partial Hadoop dependencies are
>>> in the user jar and the rest is only available from the Hadoop deps that
>>> come bundled with Flink. For example, I noticed that you have Hadoop-common
>>> as a dependency which probably ends up in your Jar.
>>>
>>>
>>> On 4. Jan 2018, at 11:40, Stephan Ewen <se...@apache.org> wrote:
>>>
>>> Hi!
>>>
>>> This looks indeed like a class-loading issue - it looks like "RpcEngine"
>>> and "ProtobufRpcEngine" are loaded via different classloaders.
>>>
>>> Can you try the following:
>>>
>>>   - In your flink-conf.yml, set classloader.resolve-order: parent-first
>>>
>>> If that fixes the issue, then we can look at a way to make this
>>> seamless...
>>>
>>> On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <ha...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> After moving to Flink 1.4.0 I'm getting the following error. I can't
>>>> find anything online that addresses it. Is it a Hadoop dependency issue?
>>>> Here are my project dependencies:
>>>>
>>>> libraryDependencies ++= Seq(
>>>>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>>>>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>>>>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>>>>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>>>>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>>>>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>>>>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>>>>   "org.apache.avro" % "avro" % "1.7.7",
>>>>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>>>>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>>>>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>>>>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
>>>> )
>>>>
>>>> *Stacktrace:*
>>>> Cluster configuration: Standalone cluster with JobManager at localhost/
>>>> 127.0.0.1:6123
>>>> Using address localhost:6123 to connect to JobManager.
>>>> JobManager web interface address http://localhost:8082
>>>> Starting execution of program
>>>> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting
>>>> for job completion.
>>>> Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259]
>>>> with leader session id 00000000-0000-0000-0000-000000000000.
>>>> 01/03/2018 14:20:52 Job execution switched to status RUNNING.
>>>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to
>>>> SCHEDULED
>>>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to
>>>> DEPLOYING
>>>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
>>>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
>>>> java.lang.RuntimeException: Error while creating FileSystem when
>>>> initializing the state of the BucketingSink.
>>>> at
>>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
>>>> at
>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>>> at
>>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.io.IOException: Cannot instantiate file system for URI:
>>>> hdfs://localhost:12345/
>>>> at
>>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>>>> at
>>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>>>> at
>>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
>>>> at
>>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>>>> at
>>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
>>>> ... 9 more
>>>> Caused by: java.lang.ClassCastException:
>>>> org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to
>>>> org.apache.hadoop.ipc.RpcEngine
>>>> at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
>>>> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
>>>> at
>>>> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
>>>> at
>>>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
>>>> at
>>>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>>>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>>>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>>>> at
>>>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>>>> at
>>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>>>> ... 13 more
>>>>
>>>
>>>
>>>

Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

Posted by Kyle Hamlin <ha...@gmail.com>.
Also, I'm not using hdfs I'm trying to sink to s3.

On Fri, Jan 5, 2018 at 6:18 PM Kyle Hamlin <ha...@gmail.com> wrote:

> I have the hadoop-common.jar in my build.sbt because I was having issues
> compiling my jar after moving from 1.3.2 to 1.4.0 because
> org.apache.hadoop.fs.{FileSystem, Path} were no longer in Flink and I use
> them in my custom bucketer and to writer to write Avro out to Parquet.
>
> I tried adding classloader.resolve-order: parent-first to my
> flink-conf.yaml but that didn't seem to work. I greped my jar for "hadoop"
> and found the following:
>
> org/apache/hadoop/*
> org/apache/parquet/hadoop/*
>
> after designating hadoop-common.jar dependency as "provided" only org/apache/parquet/hadoop/*
> files show up. Additionally, the "RpcEngine" and "ProtobufRpcEngine" error
> doesn't show up anymore just the following:
>
> java.lang.RuntimeException: Error while creating FileSystem when
> initializing the state of the BucketingSink.
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot instantiate file system for URI:
> hdfs://localhost:12345/
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
> ... 9 more
>
> Moving the hadoop-common.jar to flinks lib/ directory also doesn't appear
> to help.
>
>
> On Thu, Jan 4, 2018 at 10:48 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> I think this might be happening because partial Hadoop dependencies are
>> in the user jar and the rest is only available from the Hadoop deps that
>> come bundled with Flink. For example, I noticed that you have Hadoop-common
>> as a dependency which probably ends up in your Jar.
>>
>>
>> On 4. Jan 2018, at 11:40, Stephan Ewen <se...@apache.org> wrote:
>>
>> Hi!
>>
>> This looks indeed like a class-loading issue - it looks like "RpcEngine"
>> and "ProtobufRpcEngine" are loaded via different classloaders.
>>
>> Can you try the following:
>>
>>   - In your flink-conf.yml, set classloader.resolve-order: parent-first
>>
>> If that fixes the issue, then we can look at a way to make this
>> seamless...
>>
>> On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <ha...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> After moving to Flink 1.4.0 I'm getting the following error. I can't
>>> find anything online that addresses it. Is it a Hadoop dependency issue?
>>> Here are my project dependencies:
>>>
>>> libraryDependencies ++= Seq(
>>>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>>>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>>>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>>>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>>>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>>>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>>>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>>>   "org.apache.avro" % "avro" % "1.7.7",
>>>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>>>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>>>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>>>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
>>> )
>>>
>>> *Stacktrace:*
>>> Cluster configuration: Standalone cluster with JobManager at localhost/
>>> 127.0.0.1:6123
>>> Using address localhost:6123 to connect to JobManager.
>>> JobManager web interface address http://localhost:8082
>>> Starting execution of program
>>> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for
>>> job completion.
>>> Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259]
>>> with leader session id 00000000-0000-0000-0000-000000000000.
>>> 01/03/2018 14:20:52 Job execution switched to status RUNNING.
>>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
>>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
>>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
>>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
>>> java.lang.RuntimeException: Error while creating FileSystem when
>>> initializing the state of the BucketingSink.
>>> at
>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.IOException: Cannot instantiate file system for URI:
>>> hdfs://localhost:12345/
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>>> at
>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>>> at
>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
>>> at
>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>>> at
>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
>>> ... 9 more
>>> Caused by: java.lang.ClassCastException:
>>> org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to
>>> org.apache.hadoop.ipc.RpcEngine
>>> at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
>>> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
>>> at
>>> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
>>> at
>>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
>>> at
>>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>>> ... 13 more
>>>
>>
>>
>>

Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

Posted by Kyle Hamlin <ha...@gmail.com>.
I have the hadoop-common.jar in my build.sbt because I was having issues
compiling my jar after moving from 1.3.2 to 1.4.0 because
org.apache.hadoop.fs.{FileSystem, Path} were no longer in Flink and I use
them in my custom bucketer and to writer to write Avro out to Parquet.

I tried adding classloader.resolve-order: parent-first to my
flink-conf.yaml but that didn't seem to work. I greped my jar for "hadoop"
and found the following:

org/apache/hadoop/*
org/apache/parquet/hadoop/*

after designating hadoop-common.jar dependency as "provided" only
org/apache/parquet/hadoop/*
files show up. Additionally, the "RpcEngine" and "ProtobufRpcEngine" error
doesn't show up anymore just the following:

java.lang.RuntimeException: Error while creating FileSystem when
initializing the state of the BucketingSink.
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI:
hdfs://localhost:12345/
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more

Moving the hadoop-common.jar to flinks lib/ directory also doesn't appear
to help.

On Thu, Jan 4, 2018 at 10:48 AM Aljoscha Krettek <al...@apache.org>
wrote:

> I think this might be happening because partial Hadoop dependencies are in
> the user jar and the rest is only available from the Hadoop deps that come
> bundled with Flink. For example, I noticed that you have Hadoop-common as a
> dependency which probably ends up in your Jar.
>
>
> On 4. Jan 2018, at 11:40, Stephan Ewen <se...@apache.org> wrote:
>
> Hi!
>
> This looks indeed like a class-loading issue - it looks like "RpcEngine"
> and "ProtobufRpcEngine" are loaded via different classloaders.
>
> Can you try the following:
>
>   - In your flink-conf.yml, set classloader.resolve-order: parent-first
>
> If that fixes the issue, then we can look at a way to make this seamless...
>
> On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <ha...@gmail.com> wrote:
>
>> Hello,
>>
>> After moving to Flink 1.4.0 I'm getting the following error. I can't find
>> anything online that addresses it. Is it a Hadoop dependency issue? Here
>> are my project dependencies:
>>
>> libraryDependencies ++= Seq(
>>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>>   "org.apache.avro" % "avro" % "1.7.7",
>>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
>> )
>>
>> *Stacktrace:*
>> Cluster configuration: Standalone cluster with JobManager at localhost/
>> 127.0.0.1:6123
>> Using address localhost:6123 to connect to JobManager.
>> JobManager web interface address http://localhost:8082
>> Starting execution of program
>> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for
>> job completion.
>> Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259]
>> with leader session id 00000000-0000-0000-0000-000000000000.
>> 01/03/2018 14:20:52 Job execution switched to status RUNNING.
>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
>> java.lang.RuntimeException: Error while creating FileSystem when
>> initializing the state of the BucketingSink.
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: Cannot instantiate file system for URI:
>> hdfs://localhost:12345/
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>> at
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
>> ... 9 more
>> Caused by: java.lang.ClassCastException:
>> org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to
>> org.apache.hadoop.ipc.RpcEngine
>> at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
>> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
>> at
>> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
>> at
>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
>> at
>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>> ... 13 more
>>
>
>
>

Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

Posted by Aljoscha Krettek <al...@apache.org>.
I think this might be happening because partial Hadoop dependencies are in the user jar and the rest is only available from the Hadoop deps that come bundled with Flink. For example, I noticed that you have Hadoop-common as a dependency which probably ends up in your Jar.

> On 4. Jan 2018, at 11:40, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi!
> 
> This looks indeed like a class-loading issue - it looks like "RpcEngine" and "ProtobufRpcEngine" are loaded via different classloaders.
> 
> Can you try the following:
> 
>   - In your flink-conf.yml, set classloader.resolve-order: parent-first
> 
> If that fixes the issue, then we can look at a way to make this seamless...
> 
> On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <hamlin.kn@gmail.com <ma...@gmail.com>> wrote:
> Hello,
> 
> After moving to Flink 1.4.0 I'm getting the following error. I can't find anything online that addresses it. Is it a Hadoop dependency issue? Here are my project dependencies: 
> 
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>   "org.apache.avro" % "avro" % "1.7.7",
>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
> )
> Stacktrace:
> Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123 <http://127.0.0.1:6123/>
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8082 <http://localhost:8082/>
> Starting execution of program
> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for job completion.
> Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000.
> 01/03/2018 14:20:52	Job execution switched to status RUNNING.
> 01/03/2018 14:20:52	Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
> 01/03/2018 14:20:52	Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
> 01/03/2018 14:20:53	Source: Kafka -> Sink: S3(1/1) switched to RUNNING
> 01/03/2018 14:20:53	Source: Kafka -> Sink: S3(1/1) switched to FAILED
> java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
> 	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
> 	... 9 more
> Caused by: java.lang.ClassCastException: org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to org.apache.hadoop.ipc.RpcEngine
> 	at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
> 	at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
> 	at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
> 	at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
> 	at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
> 	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
> 	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
> 	... 13 more
> 


Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

Posted by Stephan Ewen <se...@apache.org>.
@Kyle:

Please also check if you have any Hadoop classes in your user jar. There
should be none, Hadoop should only be in the Flink classpath.
Fixing the project Maven setup (making sure Hadoop and Flink core
dependencies are provided) should work.

To do that, you can for example use the latest quickstart template from
Flink 1.4

On Thu, Jan 4, 2018 at 11:40 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> This looks indeed like a class-loading issue - it looks like "RpcEngine"
> and "ProtobufRpcEngine" are loaded via different classloaders.
>
> Can you try the following:
>
>   - In your flink-conf.yml, set classloader.resolve-order: parent-first
>
> If that fixes the issue, then we can look at a way to make this seamless...
>
> On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <ha...@gmail.com> wrote:
>
>> Hello,
>>
>> After moving to Flink 1.4.0 I'm getting the following error. I can't find
>> anything online that addresses it. Is it a Hadoop dependency issue? Here
>> are my project dependencies:
>>
>> libraryDependencies ++= Seq(
>>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>>   "org.apache.avro" % "avro" % "1.7.7",
>>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
>> )
>>
>> *Stacktrace:*
>> Cluster configuration: Standalone cluster with JobManager at localhost/
>> 127.0.0.1:6123
>> Using address localhost:6123 to connect to JobManager.
>> JobManager web interface address http://localhost:8082
>> Starting execution of program
>> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for
>> job completion.
>> Connected to JobManager at Actor[akka.tcp://flink@localho
>> st:6123/user/jobmanager#-1321297259] with leader session id
>> 00000000-0000-0000-0000-000000000000.
>> 01/03/2018 14:20:52 Job execution switched to status RUNNING.
>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
>> java.lang.RuntimeException: Error while creating FileSystem when
>> initializing the state of the BucketingSink.
>> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
>> Sink.initializeState(BucketingSink.java:358)
>> at org.apache.flink.streaming.util.functions.StreamingFunctionU
>> tils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>> at org.apache.flink.streaming.util.functions.StreamingFunctionU
>> tils.restoreFunctionState(StreamingFunctionUtils.java:160)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.initializeState(AbstractUdfStreamOperator.java:96)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor.initializeState(AbstractStreamOperator.java:259)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.initiali
>> zeOperators(StreamTask.java:694)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.initiali
>> zeState(StreamTask.java:682)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>> treamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: Cannot instantiate file system for URI:
>> hdfs://localhost:12345/
>> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(Hado
>> opFsFactory.java:187)
>> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F
>> ileSystem.java:401)
>> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
>> Sink.createHadoopFileSystem(BucketingSink.java:1154)
>> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
>> Sink.initFileSystem(BucketingSink.java:411)
>> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
>> Sink.initializeState(BucketingSink.java:355)
>> ... 9 more
>> Caused by: java.lang.ClassCastException: org.apache.hadoop.ipc.ProtobufRpcEngine
>> cannot be cast to org.apache.hadoop.ipc.RpcEngine
>> at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
>> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
>> at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClie
>> ntProtocol(NameNodeProxies.java:418)
>> at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(Name
>> NodeProxies.java:314)
>> at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeP
>> roxies.java:176)
>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>> at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(Dist
>> ributedFileSystem.java:149)
>> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(Hado
>> opFsFactory.java:159)
>> ... 13 more
>>
>
>

Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

Posted by Stephan Ewen <se...@apache.org>.
Hi!

This looks indeed like a class-loading issue - it looks like "RpcEngine"
and "ProtobufRpcEngine" are loaded via different classloaders.

Can you try the following:

  - In your flink-conf.yml, set classloader.resolve-order: parent-first

If that fixes the issue, then we can look at a way to make this seamless...

On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <ha...@gmail.com> wrote:

> Hello,
>
> After moving to Flink 1.4.0 I'm getting the following error. I can't find
> anything online that addresses it. Is it a Hadoop dependency issue? Here
> are my project dependencies:
>
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>   "org.apache.avro" % "avro" % "1.7.7",
>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
> )
>
> *Stacktrace:*
> Cluster configuration: Standalone cluster with JobManager at localhost/
> 127.0.0.1:6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8082
> Starting execution of program
> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://flink@localho
> st:6123/user/jobmanager#-1321297259] with leader session id
> 00000000-0000-0000-0000-000000000000.
> 01/03/2018 14:20:52 Job execution switched to status RUNNING.
> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
> java.lang.RuntimeException: Error while creating FileSystem when
> initializing the state of the BucketingSink.
> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
> Sink.initializeState(BucketingSink.java:358)
> at org.apache.flink.streaming.util.functions.StreamingFunctionU
> tils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at org.apache.flink.streaming.util.functions.StreamingFunctionU
> tils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
> erator.initializeState(AbstractUdfStreamOperator.java:96)
> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
> tor.initializeState(AbstractStreamOperator.java:259)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.initiali
> zeOperators(StreamTask.java:694)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.initiali
> zeState(StreamTask.java:682)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot instantiate file system for URI:
> hdfs://localhost:12345/
> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(Hado
> opFsFactory.java:187)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F
> ileSystem.java:401)
> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
> Sink.createHadoopFileSystem(BucketingSink.java:1154)
> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
> Sink.initFileSystem(BucketingSink.java:411)
> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
> Sink.initializeState(BucketingSink.java:355)
> ... 9 more
> Caused by: java.lang.ClassCastException: org.apache.hadoop.ipc.ProtobufRpcEngine
> cannot be cast to org.apache.hadoop.ipc.RpcEngine
> at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
> at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClie
> ntProtocol(NameNodeProxies.java:418)
> at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(Name
> NodeProxies.java:314)
> at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeP
> roxies.java:176)
> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
> at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(Dist
> ributedFileSystem.java:149)
> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(Hado
> opFsFactory.java:159)
> ... 13 more
>