You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Avi Levi <av...@theneura.com> on 2021/03/04 15:45:49 UTC

reading file from s3

Hi ,
I am pretty new. I am keep on struggling to read a file from s3 but getting
this weird exception :
Caused by: java.lang.NumberFormatException: For input string: "64M" (if
anyone can link me to a working github example that will be awesome) . what
am i doing wrong?

This is how my code looks like this :

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.page.PageReadStore
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.ColumnIOFactory

class ParquetSourceFunction extends SourceFunction[String]{
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    val inputPath = "s3a://foo/year=2000/month=02/"
    val conf = new Configuration()
    conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
    val readFooter = ParquetFileReader.open(hadoopFile)
    val metadata = readFooter.getFileMetaData
    val schema = metadata.getSchema
    val parquetFileReader = new ParquetFileReader(conf, metadata, new
Path(inputPath), readFooter.getRowGroups, schema.getColumns)
    var pages: PageReadStore = null

    try {
      while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
        val rows = pages.getRowCount
        val columnIO = new ColumnIOFactory().getColumnIO(schema)
        val recordReader = columnIO.getRecordReader(pages, new
GroupRecordConverter(schema))
        (0L until rows).foreach { _ =>
          val group = recordReader.read()
          val myString = group.getString("field_name", 0)
          ctx.collect(myString)
        }
      }
    }
  }

  override def cancel(): Unit = ???
}

object Job {
  def main(args: Array[String]): Unit = {
    // set up the execution environment
    lazy val env = StreamExecutionEnvironment.getExecutionEnvironment


    lazy val stream = env.addSource(new ParquetSourceFunction)
    stream.print()
    env.execute()
  }
}

sbt dependencies :


ThisBuild / scalaVersion := "2.12.1"

val flinkVersion = "1.12.1"
val awsSdkVersion = "1.7.4"
val hadoopVersion = "2.7.3"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided",
  "org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided")
  "org.apache.flink" %% "flink-parquet" % flinkVersion,
  "org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)

val s3Dependencies = Seq(
  ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
  ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
)

val serializationDependencies = Seq(
  ("org.apache.avro" % "avro" % "1.7.7"),
  ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
  ("org.apache.parquet" % "parquet-avro" % "1.8.1"))

lazy val root = (project in file(".")).
  settings(
    libraryDependencies ++= flinkDependencies,
    libraryDependencies ++= s3Dependencies,
    libraryDependencies ++= serializationDependencies,
    libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" ,
    libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1",
    libraryDependencies += "org.apache.flink" %%
"flink-table-planner-blink" % "1.12.1" //% "provided"
  )

Re: reading file from s3

Posted by Avi Levi <av...@theneura.com>.
Thanks Tamir,
I was having some issues connecting from my IDE (solved) but this is really
helpful.


On Sat, Mar 6, 2021, 23:04 Tamir Sagi <Ta...@niceactimize.com> wrote:

> I had a typo in my previous answer, the env name was missing an 'S'
>
> ENABLE_BUILT_IN_PLUGIN --> ENABLE_BUILT_IN_PLUGIN*S*
> once again, the value is *the plugin jar name*
> : flink-s3-fs-hadoop-<flink-version>.jar
> The complete list can be found here
> <https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop>
>
> You can Build your own Flink image and set an Environment variable in it
> or once you run the container.
> If you execute it locally(not in a container) in a standalone cluster,
> make sure this env is defined in system level.
>
> Tamir.
> ------------------------------
> *From:* Tamir Sagi <Ta...@niceactimize.com>
> *Sent:* Saturday, March 6, 2021 7:33 PM
> *To:* Avi Levi <av...@theneura.com>; Chesnay Schepler <ch...@apache.org>
> *Cc:* user@flink.apache.org <us...@flink.apache.org>
> *Subject:* [SUSPECTED FRAUD]Re: reading file from s3
>
> Hey Avi,
>
> Do you use 'Hadoop S3 plugin' to read from S3?
>
> If yes, what is its version?
>
> If not try to read from S3 as follow (ref
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/filesystems/s3.html#amazon-s3>
> )
>
>    1. set an environment variable to use hadoop plugin (it's part of
>    Flink image):
>    key = ENABLE_BUILT_IN_PLUGIN
>    value = flink-s3-fs-hadoop-<flink version>.jar
>    (i.e flink-s3-fs-hadoop-1.11.1.jar,  for Flink 1.11.1)
>    2. read the file from S3:
>    *DataSource<String> lines = env.readTextFile("s3://<location>");*
>
> Tamir
> ------------------------------
> *From:* Avi Levi <av...@theneura.com>
> *Sent:* Saturday, March 6, 2021 6:59 AM
> *To:* Chesnay Schepler <ch...@apache.org>
> *Cc:* user@flink.apache.org <us...@flink.apache.org>
> *Subject:* Re: reading file from s3
>
>
> *EXTERNAL EMAIL*
>
>
> Does anyone by any chance have a working example (of course without the
> credentials etc') that can be shared on github ?simply reading/writing a
> file from/to s3.
> I keep on struggling with this one and getting weird exceptions
> Thanks
>
> On Thu, Mar 4, 2021 at 7:30 PM Avi Levi <av...@theneura.com> wrote:
>
> Sure, This is the full exception stacktrace:
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
> at akka.dispatch.OnComplete.internal(Future.scala:264)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:531)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
> at jdk.internal.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:564)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ... 4 more
> Caused by: java.lang.NumberFormatException: For input string: "64M"
> at
> java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
> at java.base/java.lang.Long.parseLong(Long.java:707)
> at java.base/java.lang.Long.parseLong(Long.java:832)
> at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1563)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:248)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> at
> org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
> at com.neura.ParquetSourceFunction.run(Job.scala:45)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
>
> On Thu, Mar 4, 2021 at 6:02 PM Chesnay Schepler <ch...@apache.org>
> wrote:
>
> Can you show us the full exception stacktrace? Intuitively I would think
> your cluster configuration contains an invalid value for some memory
> configuration option.
>
> On 3/4/2021 4:45 PM, Avi Levi wrote:
>
> Hi ,
> I am pretty new. I am keep on struggling to read a file from s3 but
> getting this weird exception :
> Caused by: java.lang.NumberFormatException: For input string: "64M" (if
> anyone can link me to a working github example that will be awesome) . what
> am i doing wrong?
>
> This is how my code looks like this :
>
> import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs.Pathimport org.apache.parquet.column.page.PageReadStoreimport org.apache.parquet.example.data.simple.convert.GroupRecordConverterimport org.apache.parquet.hadoop.ParquetFileReaderimport org.apache.parquet.hadoop.util.HadoopInputFileimport org.apache.parquet.io.ColumnIOFactory
> class ParquetSourceFunction extends SourceFunction[String]{
>   override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
>     val inputPath = "s3a://foo/year=2000/month=02/"    val conf = new Configuration()
>     conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
>     val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
>     val readFooter = ParquetFileReader.open(hadoopFile)
>     val metadata = readFooter.getFileMetaData
>     val schema = metadata.getSchema
>     val parquetFileReader = new ParquetFileReader(conf, metadata, new Path(inputPath), readFooter.getRowGroups, schema.getColumns)    var pages: PageReadStore = null    try {
>       while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
>         val rows = pages.getRowCount
>         val columnIO = new ColumnIOFactory().getColumnIO(schema)
>         val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
>         (0L until rows).foreach { _ =>
>           val group = recordReader.read()
>           val myString = group.getString("field_name", 0)
>           ctx.collect(myString)
>         }
>       }
>     }
>   }
>
>   override def cancel(): Unit = ???}
> object Job {
>   def main(args: Array[String]): Unit = {
>     // set up the execution environment    lazy val env = StreamExecutionEnvironment.getExecutionEnvironment    lazy val stream = env.addSource(new ParquetSourceFunction)
>     stream.print()
>     env.execute()
>   }
> }
>
> sbt dependencies :
>
> ThisBuild / scalaVersion := "2.12.1"val flinkVersion = "1.12.1"val awsSdkVersion = "1.7.4"val hadoopVersion = "2.7.3"val flinkDependencies = Seq(
>   "org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided",  "org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided",  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided")  "org.apache.flink" %% "flink-parquet" % flinkVersion,  "org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
> val s3Dependencies = Seq(
>   ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),  ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
> )
> val serializationDependencies = Seq(
>   ("org.apache.avro" % "avro" % "1.7.7"),  ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),  ("org.apache.parquet" % "parquet-avro" % "1.8.1"))
> lazy val root = (project in file(".")).
>   settings(
>     libraryDependencies ++= flinkDependencies,    libraryDependencies ++= s3Dependencies,    libraryDependencies ++= serializationDependencies,    libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" ,    libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1",    libraryDependencies += "org.apache.flink" %% "flink-table-planner-blink" % "1.12.1" //% "provided"  )
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>

Re: reading file from s3

Posted by Tamir Sagi <Ta...@niceactimize.com>.
I had a typo in my previous answer, the env name was missing an 'S'

ENABLE_BUILT_IN_PLUGIN --> ENABLE_BUILT_IN_PLUGINS
once again, the value is the plugin jar name : flink-s3-fs-hadoop-<flink-version>.jar
The complete list can be found here<https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop>

You can Build your own Flink image and set an Environment variable in it or once you run the container.
If you execute it locally(not in a container) in a standalone cluster, make sure this env is defined in system level.

Tamir.
[https://my-email-signature.link/signature.gif?u=1088647&e=139753658&v=61d0f9764de1d96603b59533d7797a450ed41f983a06e5e81998ef6e88d9aff3]
________________________________
From: Tamir Sagi <Ta...@niceactimize.com>
Sent: Saturday, March 6, 2021 7:33 PM
To: Avi Levi <av...@theneura.com>; Chesnay Schepler <ch...@apache.org>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: [SUSPECTED FRAUD]Re: reading file from s3

Hey Avi,

Do you use 'Hadoop S3 plugin' to read from S3?

If yes, what is its version?

If not try to read from S3 as follow (ref<https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/filesystems/s3.html#amazon-s3>)

  1.  set an environment variable to use hadoop plugin (it's part of Flink image):
key = ENABLE_BUILT_IN_PLUGIN
value = flink-s3-fs-hadoop-<flink version>.jar (i.e flink-s3-fs-hadoop-1.11.1.jar,  for Flink 1.11.1)
  2.  read the file from S3:
DataSource<String> lines = env.readTextFile("s3://<location>");

Tamir
[https://my-email-signature.link/signature.gif?u=1088647&e=139745102&v=e1c175d1aad586ec34f211146023d1e58b49bba775226af52da8148eaa4c27fd]
________________________________
From: Avi Levi <av...@theneura.com>
Sent: Saturday, March 6, 2021 6:59 AM
To: Chesnay Schepler <ch...@apache.org>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: reading file from s3


EXTERNAL EMAIL


Does anyone by any chance have a working example (of course without the credentials etc') that can be shared on github ?simply reading/writing a file from/to s3.
I keep on struggling with this one and getting weird exceptions
Thanks

On Thu, Mar 4, 2021 at 7:30 PM Avi Levi <av...@theneura.com>> wrote:
Sure, This is the full exception stacktrace:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:531)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at jdk.internal.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.lang.NumberFormatException: For input string: "64M"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
at java.base/java.lang.Long.parseLong(Long.java:707)
at java.base/java.lang.Long.parseLong(Long.java:832)
at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1563)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:248)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
at com.neura.ParquetSourceFunction.run(Job.scala:45)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)

On Thu, Mar 4, 2021 at 6:02 PM Chesnay Schepler <ch...@apache.org>> wrote:
Can you show us the full exception stacktrace? Intuitively I would think your cluster configuration contains an invalid value for some memory configuration option.

On 3/4/2021 4:45 PM, Avi Levi wrote:
Hi ,
I am pretty new. I am keep on struggling to read a file from s3 but getting this weird exception :
Caused by: java.lang.NumberFormatException: For input string: "64M" (if anyone can link me to a working github example that will be awesome) . what am i doing wrong?

This is how my code looks like this :

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.page.PageReadStore
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.ColumnIOFactory

class ParquetSourceFunction extends SourceFunction[String]{
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    val inputPath = "s3a://foo/year=2000/month=02/"
    val conf = new Configuration()
    conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
    val readFooter = ParquetFileReader.open(hadoopFile)
    val metadata = readFooter.getFileMetaData
    val schema = metadata.getSchema
    val parquetFileReader = new ParquetFileReader(conf, metadata, new Path(inputPath), readFooter.getRowGroups, schema.getColumns)
    var pages: PageReadStore = null

    try {
      while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
        val rows = pages.getRowCount
        val columnIO = new ColumnIOFactory().getColumnIO(schema)
        val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
        (0L until rows).foreach { _ =>
          val group = recordReader.read()
          val myString = group.getString("field_name", 0)
          ctx.collect(myString)
        }
      }
    }
  }

  override def cancel(): Unit = ???
}

object Job {
  def main(args: Array[String]): Unit = {
    // set up the execution environment
    lazy val env = StreamExecutionEnvironment.getExecutionEnvironment


    lazy val stream = env.addSource(new ParquetSourceFunction)
    stream.print()
    env.execute()
  }
}


sbt dependencies :


ThisBuild / scalaVersion := "2.12.1"

val flinkVersion = "1.12.1"
val awsSdkVersion = "1.7.4"
val hadoopVersion = "2.7.3"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided",
  "org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided")
  "org.apache.flink" %% "flink-parquet" % flinkVersion,
  "org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)

val s3Dependencies = Seq(
  ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
  ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
)

val serializationDependencies = Seq(
  ("org.apache.avro" % "avro" % "1.7.7"),
  ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
  ("org.apache.parquet" % "parquet-avro" % "1.8.1"))

lazy val root = (project in file(".")).
  settings(
    libraryDependencies ++= flinkDependencies,
    libraryDependencies ++= s3Dependencies,
    libraryDependencies ++= serializationDependencies,
    libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" ,
    libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1",
    libraryDependencies += "org.apache.flink" %% "flink-table-planner-blink" % "1.12.1" //% "provided"
  )


Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.

Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.

Re: reading file from s3

Posted by Tamir Sagi <Ta...@niceactimize.com>.
Hey Avi,

Do you use 'Hadoop S3 plugin' to read from S3?

If yes, what is its version?

If not try to read from S3 as follow (ref<https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/filesystems/s3.html#amazon-s3>)

  1.  set an environment variable to use hadoop plugin (it's part of Flink image):
key = ENABLE_BUILT_IN_PLUGIN
value = flink-s3-fs-hadoop-<flink version>.jar (i.e flink-s3-fs-hadoop-1.11.1.jar,  for Flink 1.11.1)
  2.  read the file from S3:
DataSource<String> lines = env.readTextFile("s3://<location>");

Tamir
[https://my-email-signature.link/signature.gif?u=1088647&e=139745102&v=e1c175d1aad586ec34f211146023d1e58b49bba775226af52da8148eaa4c27fd]
________________________________
From: Avi Levi <av...@theneura.com>
Sent: Saturday, March 6, 2021 6:59 AM
To: Chesnay Schepler <ch...@apache.org>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: reading file from s3


EXTERNAL EMAIL


Does anyone by any chance have a working example (of course without the credentials etc') that can be shared on github ?simply reading/writing a file from/to s3.
I keep on struggling with this one and getting weird exceptions
Thanks

On Thu, Mar 4, 2021 at 7:30 PM Avi Levi <av...@theneura.com>> wrote:
Sure, This is the full exception stacktrace:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:531)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at jdk.internal.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.lang.NumberFormatException: For input string: "64M"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
at java.base/java.lang.Long.parseLong(Long.java:707)
at java.base/java.lang.Long.parseLong(Long.java:832)
at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1563)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:248)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
at com.neura.ParquetSourceFunction.run(Job.scala:45)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)

On Thu, Mar 4, 2021 at 6:02 PM Chesnay Schepler <ch...@apache.org>> wrote:
Can you show us the full exception stacktrace? Intuitively I would think your cluster configuration contains an invalid value for some memory configuration option.

On 3/4/2021 4:45 PM, Avi Levi wrote:
Hi ,
I am pretty new. I am keep on struggling to read a file from s3 but getting this weird exception :
Caused by: java.lang.NumberFormatException: For input string: "64M" (if anyone can link me to a working github example that will be awesome) . what am i doing wrong?

This is how my code looks like this :

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.page.PageReadStore
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.io.ColumnIOFactory

class ParquetSourceFunction extends SourceFunction[String]{
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    val inputPath = "s3a://foo/year=2000/month=02/"
    val conf = new Configuration()
    conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
    val readFooter = ParquetFileReader.open(hadoopFile)
    val metadata = readFooter.getFileMetaData
    val schema = metadata.getSchema
    val parquetFileReader = new ParquetFileReader(conf, metadata, new Path(inputPath), readFooter.getRowGroups, schema.getColumns)
    var pages: PageReadStore = null

    try {
      while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
        val rows = pages.getRowCount
        val columnIO = new ColumnIOFactory().getColumnIO(schema)
        val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
        (0L until rows).foreach { _ =>
          val group = recordReader.read()
          val myString = group.getString("field_name", 0)
          ctx.collect(myString)
        }
      }
    }
  }

  override def cancel(): Unit = ???
}

object Job {
  def main(args: Array[String]): Unit = {
    // set up the execution environment
    lazy val env = StreamExecutionEnvironment.getExecutionEnvironment


    lazy val stream = env.addSource(new ParquetSourceFunction)
    stream.print()
    env.execute()
  }
}


sbt dependencies :


ThisBuild / scalaVersion := "2.12.1"

val flinkVersion = "1.12.1"
val awsSdkVersion = "1.7.4"
val hadoopVersion = "2.7.3"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided",
  "org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided")
  "org.apache.flink" %% "flink-parquet" % flinkVersion,
  "org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)

val s3Dependencies = Seq(
  ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
  ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
)

val serializationDependencies = Seq(
  ("org.apache.avro" % "avro" % "1.7.7"),
  ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
  ("org.apache.parquet" % "parquet-avro" % "1.8.1"))

lazy val root = (project in file(".")).
  settings(
    libraryDependencies ++= flinkDependencies,
    libraryDependencies ++= s3Dependencies,
    libraryDependencies ++= serializationDependencies,
    libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" ,
    libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1",
    libraryDependencies += "org.apache.flink" %% "flink-table-planner-blink" % "1.12.1" //% "provided"
  )


Confidentiality: This communication and any attachments are intended for the above-named persons only and may be confidential and/or legally privileged. Any opinions expressed in this communication are not necessarily those of NICE Actimize. If this communication has come to you in error you must take no action based on it, nor must you copy or show it to anyone; please delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and attachments are free from any virus, we advise that in keeping with good computing practice the recipient should ensure they are actually virus free.

Re: reading file from s3

Posted by Avi Levi <av...@theneura.com>.
Does anyone by any chance have a working example (of course without the
credentials etc') that can be shared on github ?simply reading/writing a
file from/to s3.
I keep on struggling with this one and getting weird exceptions
Thanks

On Thu, Mar 4, 2021 at 7:30 PM Avi Levi <av...@theneura.com> wrote:

> Sure, This is the full exception stacktrace:
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
> at akka.dispatch.OnComplete.internal(Future.scala:264)
> at akka.dispatch.OnComplete.internal(Future.scala:261)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:531)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
> at jdk.internal.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:564)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ... 4 more
> Caused by: java.lang.NumberFormatException: For input string: "64M"
> at
> java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
> at java.base/java.lang.Long.parseLong(Long.java:707)
> at java.base/java.lang.Long.parseLong(Long.java:832)
> at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1563)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:248)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> at
> org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
> at com.neura.ParquetSourceFunction.run(Job.scala:45)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
>
> On Thu, Mar 4, 2021 at 6:02 PM Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> Can you show us the full exception stacktrace? Intuitively I would think
>> your cluster configuration contains an invalid value for some memory
>> configuration option.
>>
>> On 3/4/2021 4:45 PM, Avi Levi wrote:
>>
>> Hi ,
>> I am pretty new. I am keep on struggling to read a file from s3 but
>> getting this weird exception :
>> Caused by: java.lang.NumberFormatException: For input string: "64M" (if
>> anyone can link me to a working github example that will be awesome) . what
>> am i doing wrong?
>>
>> This is how my code looks like this :
>>
>> import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs.Pathimport org.apache.parquet.column.page.PageReadStoreimport org.apache.parquet.example.data.simple.convert.GroupRecordConverterimport org.apache.parquet.hadoop.ParquetFileReaderimport org.apache.parquet.hadoop.util.HadoopInputFileimport org.apache.parquet.io.ColumnIOFactory
>> class ParquetSourceFunction extends SourceFunction[String]{
>>   override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
>>     val inputPath = "s3a://foo/year=2000/month=02/"    val conf = new Configuration()
>>     conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
>>     val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
>>     val readFooter = ParquetFileReader.open(hadoopFile)
>>     val metadata = readFooter.getFileMetaData
>>     val schema = metadata.getSchema
>>     val parquetFileReader = new ParquetFileReader(conf, metadata, new Path(inputPath), readFooter.getRowGroups, schema.getColumns)    var pages: PageReadStore = null    try {
>>       while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
>>         val rows = pages.getRowCount
>>         val columnIO = new ColumnIOFactory().getColumnIO(schema)
>>         val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
>>         (0L until rows).foreach { _ =>
>>           val group = recordReader.read()
>>           val myString = group.getString("field_name", 0)
>>           ctx.collect(myString)
>>         }
>>       }
>>     }
>>   }
>>
>>   override def cancel(): Unit = ???}
>> object Job {
>>   def main(args: Array[String]): Unit = {
>>     // set up the execution environment    lazy val env = StreamExecutionEnvironment.getExecutionEnvironment    lazy val stream = env.addSource(new ParquetSourceFunction)
>>     stream.print()
>>     env.execute()
>>   }
>> }
>>
>> sbt dependencies :
>>
>> ThisBuild / scalaVersion := "2.12.1"val flinkVersion = "1.12.1"val awsSdkVersion = "1.7.4"val hadoopVersion = "2.7.3"val flinkDependencies = Seq(
>>   "org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided",  "org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided",  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided")  "org.apache.flink" %% "flink-parquet" % flinkVersion,  "org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
>> val s3Dependencies = Seq(
>>   ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),  ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
>> )
>> val serializationDependencies = Seq(
>>   ("org.apache.avro" % "avro" % "1.7.7"),  ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),  ("org.apache.parquet" % "parquet-avro" % "1.8.1"))
>> lazy val root = (project in file(".")).
>>   settings(
>>     libraryDependencies ++= flinkDependencies,    libraryDependencies ++= s3Dependencies,    libraryDependencies ++= serializationDependencies,    libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" ,    libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1",    libraryDependencies += "org.apache.flink" %% "flink-table-planner-blink" % "1.12.1" //% "provided"  )
>>
>>
>>

Re: reading file from s3

Posted by Avi Levi <av...@theneura.com>.
Sure, This is the full exception stacktrace:

Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at
java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
at
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:531)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at jdk.internal.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.lang.NumberFormatException: For input string: "64M"
at
java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
at java.base/java.lang.Long.parseLong(Long.java:707)
at java.base/java.lang.Long.parseLong(Long.java:832)
at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1563)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:248)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
at com.neura.ParquetSourceFunction.run(Job.scala:45)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)

On Thu, Mar 4, 2021 at 6:02 PM Chesnay Schepler <ch...@apache.org> wrote:

> Can you show us the full exception stacktrace? Intuitively I would think
> your cluster configuration contains an invalid value for some memory
> configuration option.
>
> On 3/4/2021 4:45 PM, Avi Levi wrote:
>
> Hi ,
> I am pretty new. I am keep on struggling to read a file from s3 but
> getting this weird exception :
> Caused by: java.lang.NumberFormatException: For input string: "64M" (if
> anyone can link me to a working github example that will be awesome) . what
> am i doing wrong?
>
> This is how my code looks like this :
>
> import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs.Pathimport org.apache.parquet.column.page.PageReadStoreimport org.apache.parquet.example.data.simple.convert.GroupRecordConverterimport org.apache.parquet.hadoop.ParquetFileReaderimport org.apache.parquet.hadoop.util.HadoopInputFileimport org.apache.parquet.io.ColumnIOFactory
> class ParquetSourceFunction extends SourceFunction[String]{
>   override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
>     val inputPath = "s3a://foo/year=2000/month=02/"    val conf = new Configuration()
>     conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
>     val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
>     val readFooter = ParquetFileReader.open(hadoopFile)
>     val metadata = readFooter.getFileMetaData
>     val schema = metadata.getSchema
>     val parquetFileReader = new ParquetFileReader(conf, metadata, new Path(inputPath), readFooter.getRowGroups, schema.getColumns)    var pages: PageReadStore = null    try {
>       while ({ pages = parquetFileReader.readNextRowGroup; pages != null }) {
>         val rows = pages.getRowCount
>         val columnIO = new ColumnIOFactory().getColumnIO(schema)
>         val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
>         (0L until rows).foreach { _ =>
>           val group = recordReader.read()
>           val myString = group.getString("field_name", 0)
>           ctx.collect(myString)
>         }
>       }
>     }
>   }
>
>   override def cancel(): Unit = ???}
> object Job {
>   def main(args: Array[String]): Unit = {
>     // set up the execution environment    lazy val env = StreamExecutionEnvironment.getExecutionEnvironment    lazy val stream = env.addSource(new ParquetSourceFunction)
>     stream.print()
>     env.execute()
>   }
> }
>
> sbt dependencies :
>
> ThisBuild / scalaVersion := "2.12.1"val flinkVersion = "1.12.1"val awsSdkVersion = "1.7.4"val hadoopVersion = "2.7.3"val flinkDependencies = Seq(
>   "org.apache.flink" %% "flink-clients" % flinkVersion,// % "provided",  "org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided",  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, // % "provided")  "org.apache.flink" %% "flink-parquet" % flinkVersion,  "org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
> val s3Dependencies = Seq(
>   ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),  ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
> )
> val serializationDependencies = Seq(
>   ("org.apache.avro" % "avro" % "1.7.7"),  ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),  ("org.apache.parquet" % "parquet-avro" % "1.8.1"))
> lazy val root = (project in file(".")).
>   settings(
>     libraryDependencies ++= flinkDependencies,    libraryDependencies ++= s3Dependencies,    libraryDependencies ++= serializationDependencies,    libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.0" ,    libraryDependencies += "org.apache.parquet" % "parquet-hadoop" % "1.11.1",    libraryDependencies += "org.apache.flink" %% "flink-table-planner-blink" % "1.12.1" //% "provided"  )
>
>
>

Re: reading file from s3

Posted by Chesnay Schepler <ch...@apache.org>.
Can you show us the full exception stacktrace? Intuitively I would think 
your cluster configuration contains an invalid value for some memory 
configuration option.

On 3/4/2021 4:45 PM, Avi Levi wrote:
> Hi ,
> I am pretty new. I am keep on struggling to read a file from s3 but 
> getting this weird exception :
> Caused by: java.lang.NumberFormatException: For input string: "64M" 
> (if anyone can link me to a working github example that will be 
> awesome) . what am i doing wrong?
>
> This is how my code looks like this :
> import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.functions.source.SourceFunction
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.fs.Path
> import org.apache.parquet.column.page.PageReadStore
> import org.apache.parquet.example.data.simple.convert.GroupRecordConverter
> import org.apache.parquet.hadoop.ParquetFileReader
> import org.apache.parquet.hadoop.util.HadoopInputFile
> import org.apache.parquet.io.ColumnIOFactory
>
> class ParquetSourceFunctionextends SourceFunction[String]{
>    override def run(ctx: SourceFunction.SourceContext[String]):Unit = {
>      val inputPath ="s3a://foo/year=2000/month=02/"val conf =new Configuration()
>      conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
>      val hadoopFile = HadoopInputFile.fromPath(new Path(inputPath), conf)
>      val readFooter = ParquetFileReader.open(hadoopFile)
>      val metadata = readFooter.getFileMetaData
>      val schema = metadata.getSchema
>      val parquetFileReader =new ParquetFileReader(conf, metadata, new Path(inputPath), readFooter.getRowGroups, schema.getColumns)
> var pages: PageReadStore =null try {
>        while ({ pages = parquetFileReader.readNextRowGroup; pages !=null }) {
>          val rows = pages.getRowCount
>          val columnIO =new ColumnIOFactory().getColumnIO(schema)
>          val recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema))
>          (0L until rows).foreach { _ =>
>            val group = recordReader.read()
>            val myString = group.getString("field_name", 0)
>            ctx.collect(myString)
>          }
>        }
>      }
>    }
>
>    override def cancel():Unit =??? }
>
> object Job {
>    def main(args: Array[String]):Unit = {
>      // set up the execution environment lazy val env = StreamExecutionEnvironment.getExecutionEnvironment lazy val stream = env.addSource(new ParquetSourceFunction)
>      stream.print()
>      env.execute()
>    }
> }
> sbt dependencies :
>
> ThisBuild /scalaVersion :="2.12.1" val flinkVersion ="1.12.1" val awsSdkVersion ="1.7.4" val hadoopVersion ="2.7.3" val flinkDependencies =Seq(
>    "org.apache.flink" %%"flink-clients" % flinkVersion,// % "provided", "org.apache.flink" %%"flink-scala" % flinkVersion,// % "provided", "org.apache.flink" %%"flink-streaming-scala" % flinkVersion, // % "provided") "org.apache.flink" %%"flink-parquet" % flinkVersion, "org.apache.flink" %%"flink-hadoop-compatibility" % flinkVersion)
>
> val s3Dependencies =Seq(
>    ("com.amazonaws" %"aws-java-sdk" % awsSdkVersion), ("org.apache.hadoop" %"hadoop-aws" % hadoopVersion)
> )
>
> val serializationDependencies =Seq(
>    ("org.apache.avro" %"avro" %"1.7.7"), ("org.apache.avro" %"avro-mapred" %"1.7.7").classifier("hadoop2"), ("org.apache.parquet" %"parquet-avro" %"1.8.1"))
>
> lazy val root = (project infile(".")).
>    settings(
>      libraryDependencies ++= flinkDependencies, libraryDependencies ++= s3Dependencies, libraryDependencies ++= serializationDependencies, libraryDependencies +="org.apache.hadoop" %"hadoop-common" %"3.3.0" , libraryDependencies +="org.apache.parquet" %"parquet-hadoop" %"1.11.1", libraryDependencies +="org.apache.flink" %%"flink-table-planner-blink" %"1.12.1" //% "provided" )