You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Aaron Levin <aa...@stripe.com> on 2018/10/19 18:06:43 UTC

Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

Hi,

I'm writing a custom `SourceFunction` which wraps an underlying
`InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
stream (via `env.addSource` and a subsequent sink) I get errors related to
the `InputSplitAssigner` not being initialized for a particular vertex ID.
Full error here[1].

I believe the underlying error is related to this[0] call to `instanceof
InputFormatSourceFunction`.

*My questions*:

1. how can I wrap a `InputFormatSourceFunction` which avoids this error? Am
I missing a chunk of the API covering this?
2. is the error I'm experience related to that casting call? If so, would
ya'll be open to a PR which adds an interface one can extend which will set
the input format in the stream graph? Or is there a preferred way of
achieving this?

Thanks!

Aaron Levin

[0]
https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480
[1]
java.lang.RuntimeException: Could not retrieve next input split.
    at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
    at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
    at REDACTED
    at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
Requesting the next input split failed.
    at
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
    at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
    ... 8 more
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: No
InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
    at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
    at
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
    ... 9 more
Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
cbc357ccb763df2852fee8c4fc7d55f2
    at
org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
...

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

Posted by Aaron Levin <aa...@stripe.com>.
Hey,

First, I appreciate everyone's help! Thank you!

I wrote several wrappers to try and debug this, including one which is an
exact copy of `InputFormatSourceFunction` which also failed. They all
failed with the same error I detail above. I'll post two of them below.
They all extended `RichParallelSourceFunction` and, as far as I could tell,
were properly initialized (though I may have missed something!).
Additionally, for the two below, if I change `extends
RichParallelSourceFunction` to `extends
InputFormatSourceFunction(...,...)`, I no longer receive the exception.
This is what led me to believe the source of the issue was casting and how
I found the line of code where the stream graph is given the input format.

Quick explanation of the wrappers:
1. `WrappedInputFormat` does a basic wrap around
`InputFormatSourceFunction` and delegates all methods to the underlying
`InputFormatSourceFunction`
2. `ClonedInputFormatSourceFunction` is a ~exact copy of the
`InputFormatSourceFunction` source.
3. They're being used in a test which looks vaguely like:
`DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new
InputFormatSourceFunction[String](source,
implicitly[TypeInformation[String]]))).javaStream).asScala.toSeq`

class WrappedInputFormat[A](
  inputFormat: InputFormatSourceFunction[A]
)(
  implicit typeInfo: TypeInformation[A]
) extends RichParallelSourceFunction[A] {

  override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
    inputFormat.run(sourceContext)
  }
  override def setRuntimeContext(t: RuntimeContext): Unit = {
    inputFormat.setRuntimeContext(t)
  }
  override def equals(obj: scala.Any) = {
    inputFormat.equals(obj)
  }
  override def hashCode() = { inputFormat.hashCode() }
  override def toString = { inputFormat.toString }
  override def getRuntimeContext(): RuntimeContext = {
inputFormat.getRuntimeContext }
  override def getIterationRuntimeContext = {
inputFormat.getIterationRuntimeContext }
  override def open(parameters: Configuration): Unit = {
    inputFormat.open(parameters)
  }
  override def cancel(): Unit = {
    inputFormat.cancel()
  }
  override def close(): Unit = {
    inputFormat.close()
  }
}

And the other one:

class ClonedInputFormatSourceFunction[A](val format: InputFormat[A,
InputSplit], val typeInfo: TypeInformation[A]) extends
RichParallelSourceFunction[A] {

  @transient private var provider: InputSplitProvider = _
  @transient private var serializer: TypeSerializer[A] = _
  @transient private var splitIterator: Iterator[InputSplit] = _
  private var isRunning: Boolean = _

  override def open(parameters: Configuration): Unit = {
    val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]
    if(format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context)
    }
    format.configure(parameters)

    provider = context.getInputSplitProvider
    serializer =
typeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
    splitIterator = getInputSplits()
    isRunning = splitIterator.hasNext
  }

  override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
    if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
    }

    var nextElement: A = serializer.createInstance()
    try {
      while (isRunning) {
        format.open(splitIterator.next())
        while (isRunning && !format.reachedEnd()) {
          nextElement = format.nextRecord(nextElement)
          if (nextElement != null) {
            sourceContext.collect(nextElement)
          } else {
            break
          }
          format.close()
          if (isRunning) {
            isRunning = splitIterator.hasNext
          }
        }
      }
    } finally {

      format.close()
      if (format.isInstanceOf[RichInputFormat[_,_]]) {
        format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
      }
      isRunning = false
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }

  override def close(): Unit = {
    format.close()
    if(format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
    }
  }

  private def getInputSplits(): Iterator[InputSplit] = {
    new Iterator[InputSplit] {
      private var nextSplit: InputSplit = _
      private var exhausted: Boolean = _

      override def hasNext: Boolean = {
        if(exhausted) { return false }
        if(nextSplit != null) { return true }
        var split: InputSplit = null

        try {
          split =
provider.getNextInputSplit(getRuntimeContext.getUserCodeClassLoader)
        } catch {
          case e: InputSplitProviderException =>
            throw new RuntimeException("No InputSplit Provider", e)
        }

        if(split != null) {
          nextSplit = split
          true
        } else {
          exhausted = true
          false
        }
      }

      override def next(): InputSplit = {
        if(nextSplit == null && !hasNext) {
          throw new NoSuchElementException()
        }
        val tmp: InputSplit = nextSplit
        nextSplit = null
        tmp
      }

    }
  }
}

Best,

Aaron Levin

On Wed, Oct 24, 2018 at 8:00 AM, Kien Truong <du...@gmail.com>
wrote:

> Hi,
>
> Since InputFormatSourceFunction is a subclass of
> RichParallelSourceFunction, your wrapper should also extend this class.
>
> In addition, remember to overwrite the methods defined in the
> AbstractRichFunction interface and
>
> proxy the call to the underlying InputFormatSourceFunction, in order to
> initialize the underlying source correctly.
>
>
> Best regards,
>
> Kien
>
>
> On 10/20/2018 1:06 AM, Aaron Levin wrote:
>
> Hi,
>
> I'm writing a custom `SourceFunction` which wraps an underlying
> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
> stream (via `env.addSource` and a subsequent sink) I get errors related to
> the `InputSplitAssigner` not being initialized for a particular vertex ID.
> Full error here[1].
>
> I believe the underlying error is related to this[0] call to `instanceof
> InputFormatSourceFunction`.
>
> *My questions*:
>
> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error?
> Am I missing a chunk of the API covering this?
> 2. is the error I'm experience related to that casting call? If so, would
> ya'll be open to a PR which adds an interface one can extend which will set
> the input format in the stream graph? Or is there a preferred way of
> achieving this?
>
> Thanks!
>
> Aaron Levin
>
> [0] https://github.com/apache/flink/blob/release-1.6/flink-
> streaming-java/src/main/java/org/apache/flink/streaming/api/graph/
> StreamGraphGenerator.java#L480
> [1]
> java.lang.RuntimeException: Could not retrieve next input split.
>     at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>     at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
>     at REDACTED
>     at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:102)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:424)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:290)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
> Requesting the next input split failed.
>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.
> getNextInputSplit(RpcInputSplitProvider.java:69)
>     at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>     ... 8 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
> No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
>     at java.util.concurrent.CompletableFuture.reportGet(
> CompletableFuture.java:357)
>     at java.util.concurrent.CompletableFuture.get(
> CompletableFuture.java:1915)
>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.
> getNextInputSplit(RpcInputSplitProvider.java:61)
>     ... 9 more
> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
> cbc357ccb763df2852fee8c4fc7d55f2
>     at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(
> JobMaster.java:575)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:247)
> ...
>
>

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

Posted by Kien Truong <du...@gmail.com>.
Hi,

Since InputFormatSourceFunction is a subclass of 
RichParallelSourceFunction, your wrapper should also extend this class.

In addition, remember to overwrite the methods defined in the 
AbstractRichFunction interface and

proxy the call to the underlying InputFormatSourceFunction, in order to 
initialize the underlying source correctly.


Best regards,

Kien


On 10/20/2018 1:06 AM, Aaron Levin wrote:
> Hi,
>
> I'm writing a custom `SourceFunction` which wraps an underlying 
> `InputFormatSourceFunction`. When I try to use this `SourceFunction` 
> in a stream (via `env.addSource` and a subsequent sink) I get errors 
> related to the `InputSplitAssigner` not being initialized for a 
> particular vertex ID. Full error here[1].
>
> I believe the underlying error is related to this[0] call to 
> `instanceof InputFormatSourceFunction`.
>
> _My questions_:
>
> 1. how can I wrap a `InputFormatSourceFunction` which avoids this 
> error? Am I missing a chunk of the API covering this?
> 2. is the error I'm experience related to that casting call? If so, 
> would ya'll be open to a PR which adds an interface one can extend 
> which will set the input format in the stream graph? Or is there a 
> preferred way of achieving this?
>
> Thanks!
>
> Aaron Levin
>
> [0] 
> https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480
> [1]
> java.lang.RuntimeException: Could not retrieve next input split.
>     at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>     at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
>     at REDACTED
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
> Requesting the next input split failed.
>     at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>     at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>     ... 8 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.Exception: No InputSplitAssigner for vertex ID 
> cbc357ccb763df2852fee8c4fc7d55f2
>     at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>     at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>     at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>     ... 9 more
> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID 
> cbc357ccb763df2852fee8c4fc7d55f2
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
> ...

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

Posted by Aaron Levin <aa...@stripe.com>.
Hi Aljoscha,

Thanks! I will look into this.

Best,

Aaron Levin

On Fri, Nov 9, 2018 at 5:01 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> I think for this case a model that is similar to how the Streaming File
> Source works should be good. You can have a look at
> ContinuousFileMonitoringFunction and ContinuousFileReaderOperator. The
> idea is that the first emits splits that should be processed and the second
> is responsible for reading those splits. A generic version of that is what
> I'm proposing for the refactoring of our source interface [1] that also
> comes with a prototype implementation [2].
>
> I think something like this should be adaptable to your case. The split
> enumerator would at first only emit file splits downstream, after that it
> would emit Kafka partitions that should be read. The split reader would
> understand both file splits and kafka partitions and can read from both.
> This still has some kinks to be worked out when it comes to watermarks,
> FLIP-27 is not finished.
>
> What do you think?
>
> Best,
> Aljoscha
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 27%3A+Refactor+Source+Interface
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>
> [2] https://github.com/aljoscha/flink/commits/refactor-source-interface
>
>
> On 1. Nov 2018, at 16:50, Aaron Levin <aa...@stripe.com> wrote:
>
> Hey,
>
> Thanks for reaching out! I'd love to take a step back and find a better
> solution, so I'll try to be succint in what I'm trying to accomplish:
>
> We're trying to write a SourceFunction which:
> * reads some Sequence files from S3 in a particular order (each task gets
> files in a specific order).
> * sends a watermark between each sequence file
> * when that's complete, starts reading from Kafka topics.
> * (This is similar to the bootstrap problem which Lyft has talked about
> (see: https://www.slideshare.net/FlinkForward/flink-forward-
> san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink))
>
> The current solution I have involves a custom InputFormat, InputSplit, and
> SplitAssignor. It achieves most of these requirements, except I have to
> extend InputFormatSourceFunction. I have a class that looks like:
>
> class MySourceFunction(val s3Archives: CustomInputFormat, val kafka:
> KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...}
>
> There are lots I don't like about the existing solution:
> * I have to extend InputFormatSourceFunction to ensure the graph is
> initialized properly (the bug I wrote about)
> * I had to replicate most of the implementation of
> InputFormatSourceFunction so I could insert Watermarks between splits.
>
> I'd love any suggestions around improving this!
>
> Best,
>
> Aaron Levin
>
> On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi Aaron,
>>
>> I'l like to take a step back and understand why you're trying to wrap an
>> InputFormatSourceFunction?
>>
>> In my opinion, InputFormatSourceFunction should not be used because it
>> has some shortcomings, the most prominent among them that it does not
>> support checkpointing, i.e. in case of failure all data will (probably) be
>> read again. I'm saying probably because the interaction of
>> InputFormatSourceFunction with how InputSplits are generated (which relates
>> to that code snippet with the cast you found) could be somewhat "spooky"
>> and lead to weird results in some cases.
>>
>> The interface is a remnant of a very early version of the streaming API
>> and should probably be removed soon. I hope we can find a better solution
>> for your problem that fits better with Flink.
>>
>> Best,
>> Aljoscha
>>
>> On 1. Nov 2018, at 15:30, Aaron Levin <aa...@stripe.com> wrote:
>>
>> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can
>> provide any insight or advice, that would be helpful!
>>
>> Thanks again.
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aa...@stripe.com>
>> wrote:
>>
>>> Hey,
>>>
>>> Not sure how convo threading works on this list, so in case the folks
>>> CC'd missed my other response, here's some more info:
>>>
>>> First, I appreciate everyone's help! Thank you!
>>>
>>> I wrote several wrappers to try and debug this, including one which is
>>> an exact copy of `InputFormatSourceFunction` which also failed. They all
>>> failed with the same error I detail above. I'll post two of them below.
>>> They all extended `RichParallelSourceFunction` and, as far as I could tell,
>>> were properly initialized (though I may have missed something!).
>>> Additionally, for the two below, if I change `extends
>>> RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`,
>>> I no longer receive the exception. This is what led me to believe the
>>> source of the issue was casting and how I found the line of code where the
>>> stream graph is given the input format.
>>>
>>> Quick explanation of the wrappers:
>>> 1. `WrappedInputFormat` does a basic wrap around
>>> `InputFormatSourceFunction` and delegates all methods to the underlying
>>> `InputFormatSourceFunction`
>>> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the
>>> `InputFormatSourceFunction` source.
>>> 3. They're being used in a test which looks vaguely like:
>>> `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new
>>> InputFormatSourceFunction[String](source, implicitly[TypeInformation[Str
>>> ing]]))).javaStream).asScala.toSeq`
>>>
>>> class WrappedInputFormat[A](
>>>   inputFormat: InputFormatSourceFunction[A]
>>> )(
>>>   implicit typeInfo: TypeInformation[A]
>>> ) extends RichParallelSourceFunction[A] {
>>>
>>>   override def run(sourceContext: SourceFunction.SourceContext[A]):
>>> Unit = {
>>>     inputFormat.run(sourceContext)
>>>   }
>>>   override def setRuntimeContext(t: RuntimeContext): Unit = {
>>>     inputFormat.setRuntimeContext(t)
>>>   }
>>>   override def equals(obj: scala.Any) = {
>>>     inputFormat.equals(obj)
>>>   }
>>>   override def hashCode() = { inputFormat.hashCode() }
>>>   override def toString = { inputFormat.toString }
>>>   override def getRuntimeContext(): RuntimeContext = {
>>> inputFormat.getRuntimeContext }
>>>   override def getIterationRuntimeContext = {
>>> inputFormat.getIterationRuntimeContext }
>>>   override def open(parameters: Configuration): Unit = {
>>>     inputFormat.open(parameters)
>>>   }
>>>   override def cancel(): Unit = {
>>>     inputFormat.cancel()
>>>   }
>>>   override def close(): Unit = {
>>>     inputFormat.close()
>>>   }
>>> }
>>>
>>> And the other one:
>>>
>>> class ClonedInputFormatSourceFunction[A](val format: InputFormat[A,
>>> InputSplit], val typeInfo: TypeInformation[A]) extends
>>> RichParallelSourceFunction[A] {
>>>
>>>   @transient private var provider: InputSplitProvider = _
>>>   @transient private var serializer: TypeSerializer[A] = _
>>>   @transient private var splitIterator: Iterator[InputSplit] = _
>>>   private var isRunning: Boolean = _
>>>
>>>   override def open(parameters: Configuration): Unit = {
>>>     val context = getRuntimeContext.asInstanceOf
>>> [StreamingRuntimeContext]
>>>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>>>       format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(
>>> context)
>>>     }
>>>     format.configure(parameters)
>>>
>>>     provider = context.getInputSplitProvider
>>>     serializer = typeInfo.createSerializer(getR
>>> untimeContext.getExecutionConfig)
>>>     splitIterator = getInputSplits()
>>>     isRunning = splitIterator.hasNext
>>>   }
>>>
>>>   override def run(sourceContext: SourceFunction.SourceContext[A]):
>>> Unit = {
>>>     if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
>>>       format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
>>>     }
>>>
>>>     var nextElement: A = serializer.createInstance()
>>>     try {
>>>       while (isRunning) {
>>>         format.open(splitIterator.next())
>>>         while (isRunning && !format.reachedEnd()) {
>>>           nextElement = format.nextRecord(nextElement)
>>>           if (nextElement != null) {
>>>             sourceContext.collect(nextElement)
>>>           } else {
>>>             break
>>>           }
>>>           format.close()
>>>           if (isRunning) {
>>>             isRunning = splitIterator.hasNext
>>>           }
>>>         }
>>>       }
>>>     } finally {
>>>
>>>       format.close()
>>>       if (format.isInstanceOf[RichInputFormat[_,_]]) {
>>>         format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>>>       }
>>>       isRunning = false
>>>     }
>>>   }
>>>
>>>   override def cancel(): Unit = {
>>>     isRunning = false
>>>   }
>>>
>>>   override def close(): Unit = {
>>>     format.close()
>>>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>>>       format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>>>     }
>>>   }
>>>
>>>   private def getInputSplits(): Iterator[InputSplit] = {
>>>     new Iterator[InputSplit] {
>>>       private var nextSplit: InputSplit = _
>>>       private var exhausted: Boolean = _
>>>
>>>       override def hasNext: Boolean = {
>>>         if(exhausted) { return false }
>>>         if(nextSplit != null) { return true }
>>>         var split: InputSplit = null
>>>
>>>         try {
>>>           split = provider.getNextInputSplit(get
>>> RuntimeContext.getUserCodeClassLoader)
>>>         } catch {
>>>           case e: InputSplitProviderException =>
>>>             throw new RuntimeException("No InputSplit Provider", e)
>>>         }
>>>
>>>         if(split != null) {
>>>           nextSplit = split
>>>           true
>>>         } else {
>>>           exhausted = true
>>>           false
>>>         }
>>>       }
>>>
>>>       override def next(): InputSplit = {
>>>         if(nextSplit == null && !hasNext) {
>>>           throw new NoSuchElementException()
>>>         }
>>>         val tmp: InputSplit = nextSplit
>>>         nextSplit = null
>>>         tmp
>>>       }
>>>
>>>     }
>>>   }
>>> }
>>>
>>>
>>> On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <
>>> dwysakowicz@apache.org> wrote:
>>>
>>>> Hi Aaron,
>>>>
>>>> Could you share the code of you custom function?
>>>>
>>>> I am also adding Aljosha and Kostas to cc, who should be more helpful
>>>> on that topic.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>> On 19/10/2018 20:06, Aaron Levin wrote:
>>>>
>>>> Hi,
>>>>
>>>> I'm writing a custom `SourceFunction` which wraps an underlying
>>>> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
>>>> stream (via `env.addSource` and a subsequent sink) I get errors related to
>>>> the `InputSplitAssigner` not being initialized for a particular vertex ID.
>>>> Full error here[1].
>>>>
>>>> I believe the underlying error is related to this[0] call to
>>>> `instanceof InputFormatSourceFunction`.
>>>>
>>>> *My questions*:
>>>>
>>>> 1. how can I wrap a `InputFormatSourceFunction` which avoids this
>>>> error? Am I missing a chunk of the API covering this?
>>>> 2. is the error I'm experience related to that casting call? If so,
>>>> would ya'll be open to a PR which adds an interface one can extend which
>>>> will set the input format in the stream graph? Or is there a preferred way
>>>> of achieving this?
>>>>
>>>> Thanks!
>>>>
>>>> Aaron Levin
>>>>
>>>> [0] https://github.com/apache/flink/blob/release-1.6/flink-s
>>>> treaming-java/src/main/java/org/apache/flink/streaming/api/g
>>>> raph/StreamGraphGenerator.java#L480
>>>> [1]
>>>> java.lang.RuntimeException: Could not retrieve next input split.
>>>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>>>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>>>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>>>> ourceFunction.open(InputFormatSourceFunction.java:71)
>>>>     at REDACTED
>>>>     at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>>>> nFunction(FunctionUtils.java:36)
>>>>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>>>> erator.open(AbstractUdfStreamOperator.java:102)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>>>> perators(StreamTask.java:424)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>>> treamTask.java:290)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>>>> Requesting the next input split failed.
>>>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi
>>>> der.getNextInputSplit(RpcInputSplitProvider.java:69)
>>>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>>>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>>>>     ... 8 more
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.Exception: No InputSplitAssigner for vertex ID
>>>> cbc357ccb763df2852fee8c4fc7d55f2
>>>>     at java.util.concurrent.CompletableFuture.reportGet(Completable
>>>> Future.java:357)
>>>>     at java.util.concurrent.CompletableFuture.get(CompletableFuture
>>>> .java:1915)
>>>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi
>>>> der.getNextInputSplit(RpcInputSplitProvider.java:61)
>>>>     ... 9 more
>>>> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
>>>> cbc357ccb763df2852fee8c4fc7d55f2
>>>>     at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInpu
>>>> tSplit(JobMaster.java:575)
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>> ssorImpl.java:62)
>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>> thodAccessorImpl.java:43)
>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
>>>> cation(AkkaRpcActor.java:247)
>>>> ...
>>>>
>>>>
>>>
>>
>>
>
>

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I think for this case a model that is similar to how the Streaming File Source works should be good. You can have a look at ContinuousFileMonitoringFunction and ContinuousFileReaderOperator. The idea is that the first emits splits that should be processed and the second is responsible for reading those splits. A generic version of that is what I'm proposing for the refactoring of our source interface [1] that also comes with a prototype implementation [2].

I think something like this should be adaptable to your case. The split enumerator would at first only emit file splits downstream, after that it would emit Kafka partitions that should be read. The split reader would understand both file splits and kafka partitions and can read from both. This still has some kinks to be worked out when it comes to watermarks, FLIP-27 is not finished.

What do you think?

Best,
Aljoscha

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>
[2] https://github.com/aljoscha/flink/commits/refactor-source-interface <https://github.com/aljoscha/flink/commits/refactor-source-interface>

> On 1. Nov 2018, at 16:50, Aaron Levin <aa...@stripe.com> wrote:
> 
> Hey,
> 
> Thanks for reaching out! I'd love to take a step back and find a better solution, so I'll try to be succint in what I'm trying to accomplish:
> 
> We're trying to write a SourceFunction which:
> * reads some Sequence files from S3 in a particular order (each task gets files in a specific order).
> * sends a watermark between each sequence file 
> * when that's complete, starts reading from Kafka topics.
> * (This is similar to the bootstrap problem which Lyft has talked about (see: https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink <https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink>)) 
> 
> The current solution I have involves a custom InputFormat, InputSplit, and SplitAssignor. It achieves most of these requirements, except I have to extend InputFormatSourceFunction. I have a class that looks like:
> 
> class MySourceFunction(val s3Archives: CustomInputFormat, val kafka: KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...}
> 
> There are lots I don't like about the existing solution:
> * I have to extend InputFormatSourceFunction to ensure the graph is initialized properly (the bug I wrote about)
> * I had to replicate most of the implementation of InputFormatSourceFunction so I could insert Watermarks between splits. 
> 
> I'd love any suggestions around improving this!
> 
> Best,
> 
> Aaron Levin
> 
> On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi Aaron,
> 
> I'l like to take a step back and understand why you're trying to wrap an InputFormatSourceFunction?
> 
> In my opinion, InputFormatSourceFunction should not be used because it has some shortcomings, the most prominent among them that it does not support checkpointing, i.e. in case of failure all data will (probably) be read again. I'm saying probably because the interaction of InputFormatSourceFunction with how InputSplits are generated (which relates to that code snippet with the cast you found) could be somewhat "spooky" and lead to weird results in some cases.
> 
> The interface is a remnant of a very early version of the streaming API and should probably be removed soon. I hope we can find a better solution for your problem that fits better with Flink.
> 
> Best,
> Aljoscha
> 
>> On 1. Nov 2018, at 15:30, Aaron Levin <aaronlevin@stripe.com <ma...@stripe.com>> wrote:
>> 
>> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can provide any insight or advice, that would be helpful!
>> 
>> Thanks again.
>> 
>> Best,
>> 
>> Aaron Levin
>> 
>> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aaronlevin@stripe.com <ma...@stripe.com>> wrote:
>> Hey,
>> 
>> Not sure how convo threading works on this list, so in case the folks CC'd missed my other response, here's some more info:
>> 
>> First, I appreciate everyone's help! Thank you! 
>> 
>> I wrote several wrappers to try and debug this, including one which is an exact copy of `InputFormatSourceFunction` which also failed. They all failed with the same error I detail above. I'll post two of them below. They all extended `RichParallelSourceFunction` and, as far as I could tell, were properly initialized (though I may have missed something!). Additionally, for the two below, if I change `extends RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`, I no longer receive the exception. This is what led me to believe the source of the issue was casting and how I found the line of code where the stream graph is given the input format.
>> 
>> Quick explanation of the wrappers:
>> 1. `WrappedInputFormat` does a basic wrap around `InputFormatSourceFunction` and delegates all methods to the underlying `InputFormatSourceFunction`
>> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the `InputFormatSourceFunction` source.
>> 3. They're being used in a test which looks vaguely like: `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new InputFormatSourceFunction[String](source, implicitly[TypeInformation[String]]))).javaStream).asScala.toSeq`
>> 
>> class WrappedInputFormat[A](
>>   inputFormat: InputFormatSourceFunction[A]
>> )(
>>   implicit typeInfo: TypeInformation[A]
>> ) extends RichParallelSourceFunction[A] {
>> 
>>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
>>     inputFormat.run(sourceContext)
>>   }
>>   override def setRuntimeContext(t: RuntimeContext): Unit = {
>>     inputFormat.setRuntimeContext(t)
>>   }
>>   override def equals(obj: scala.Any) = {
>>     inputFormat.equals(obj)
>>   }
>>   override def hashCode() = { inputFormat.hashCode() }
>>   override def toString = { inputFormat.toString }
>>   override def getRuntimeContext(): RuntimeContext = { inputFormat.getRuntimeContext }
>>   override def getIterationRuntimeContext = { inputFormat.getIterationRuntimeContext }
>>   override def open(parameters: Configuration): Unit = {
>>     inputFormat.open(parameters)
>>   }
>>   override def cancel(): Unit = {
>>     inputFormat.cancel()
>>   }
>>   override def close(): Unit = {
>>     inputFormat.close()
>>   }
>> }
>> 
>> And the other one:
>> 
>> class ClonedInputFormatSourceFunction[A](val format: InputFormat[A, InputSplit], val typeInfo: TypeInformation[A]) extends RichParallelSourceFunction[A] {
>> 
>>   @transient private var provider: InputSplitProvider = _
>>   @transient private var serializer: TypeSerializer[A] = _
>>   @transient private var splitIterator: Iterator[InputSplit] = _
>>   private var isRunning: Boolean = _
>> 
>>   override def open(parameters: Configuration): Unit = {
>>     val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]
>>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>>       format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context)
>>     }
>>     format.configure(parameters)
>> 
>>     provider = context.getInputSplitProvider
>>     serializer = typeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
>>     splitIterator = getInputSplits()
>>     isRunning = splitIterator.hasNext
>>   }
>> 
>>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
>>     if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
>>       format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
>>     }
>> 
>>     var nextElement: A = serializer.createInstance()
>>     try {
>>       while (isRunning) {
>>         format.open(splitIterator.next())
>>         while (isRunning && !format.reachedEnd()) {
>>           nextElement = format.nextRecord(nextElement)
>>           if (nextElement != null) {
>>             sourceContext.collect(nextElement)
>>           } else {
>>             break
>>           }
>>           format.close()
>>           if (isRunning) {
>>             isRunning = splitIterator.hasNext
>>           }
>>         }
>>       }
>>     } finally {
>> 
>>       format.close()
>>       if (format.isInstanceOf[RichInputFormat[_,_]]) {
>>         format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>>       }
>>       isRunning = false
>>     }
>>   }
>> 
>>   override def cancel(): Unit = {
>>     isRunning = false
>>   }
>> 
>>   override def close(): Unit = {
>>     format.close()
>>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>>       format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>>     }
>>   }
>> 
>>   private def getInputSplits(): Iterator[InputSplit] = {
>>     new Iterator[InputSplit] {
>>       private var nextSplit: InputSplit = _
>>       private var exhausted: Boolean = _
>> 
>>       override def hasNext: Boolean = {
>>         if(exhausted) { return false }
>>         if(nextSplit != null) { return true }
>>         var split: InputSplit = null
>> 
>>         try {
>>           split = provider.getNextInputSplit(getRuntimeContext.getUserCodeClassLoader)
>>         } catch {
>>           case e: InputSplitProviderException =>
>>             throw new RuntimeException("No InputSplit Provider", e)
>>         }
>> 
>>         if(split != null) {
>>           nextSplit = split
>>           true
>>         } else {
>>           exhausted = true
>>           false
>>         }
>>       }
>> 
>>       override def next(): InputSplit = {
>>         if(nextSplit == null && !hasNext) {
>>           throw new NoSuchElementException()
>>         }
>>         val tmp: InputSplit = nextSplit
>>         nextSplit = null
>>         tmp
>>       }
>> 
>>     }
>>   }
>> }
>> 
>> 
>> On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>> Hi Aaron,
>> 
>> Could you share the code of you custom function?
>> 
>> I am also adding Aljosha and Kostas to cc, who should be more helpful on that topic.
>> 
>> Best,
>> 
>> Dawid
>> 
>> On 19/10/2018 20:06, Aaron Levin wrote:
>>> Hi,
>>> 
>>> I'm writing a custom `SourceFunction` which wraps an underlying `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a stream (via `env.addSource` and a subsequent sink) I get errors related to the `InputSplitAssigner` not being initialized for a particular vertex ID. Full error here[1].
>>> 
>>> I believe the underlying error is related to this[0] call to `instanceof InputFormatSourceFunction`.
>>> 
>>> My questions:
>>> 
>>> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error? Am I missing a chunk of the API covering this?
>>> 2. is the error I'm experience related to that casting call? If so, would ya'll be open to a PR which adds an interface one can extend which will set the input format in the stream graph? Or is there a preferred way of achieving this?
>>> 
>>> Thanks!
>>> 
>>> Aaron Levin
>>> 
>>> [0] https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480 <https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480>
>>> [1] 
>>> java.lang.RuntimeException: Could not retrieve next input split.
>>>     at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>>>     at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
>>>     at REDACTED
>>>     at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
>>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>>>     at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>>>     ... 8 more
>>> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
>>>     at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>>>     ... 9 more
>>> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
>>>     at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>>> ...
>> 
>> 
> 
> 


Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

Posted by Aaron Levin <aa...@stripe.com>.
Hey,

Thanks for reaching out! I'd love to take a step back and find a better
solution, so I'll try to be succint in what I'm trying to accomplish:

We're trying to write a SourceFunction which:
* reads some Sequence files from S3 in a particular order (each task gets
files in a specific order).
* sends a watermark between each sequence file
* when that's complete, starts reading from Kafka topics.
* (This is similar to the bootstrap problem which Lyft has talked about
(see:
https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink
))

The current solution I have involves a custom InputFormat, InputSplit, and
SplitAssignor. It achieves most of these requirements, except I have to
extend InputFormatSourceFunction. I have a class that looks like:

class MySourceFunction(val s3Archives: CustomInputFormat, val kafka:
KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...}

There are lots I don't like about the existing solution:
* I have to extend InputFormatSourceFunction to ensure the graph is
initialized properly (the bug I wrote about)
* I had to replicate most of the implementation of
InputFormatSourceFunction so I could insert Watermarks between splits.

I'd love any suggestions around improving this!

Best,

Aaron Levin

On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Aaron,
>
> I'l like to take a step back and understand why you're trying to wrap an
> InputFormatSourceFunction?
>
> In my opinion, InputFormatSourceFunction should not be used because it has
> some shortcomings, the most prominent among them that it does not support
> checkpointing, i.e. in case of failure all data will (probably) be read
> again. I'm saying probably because the interaction of
> InputFormatSourceFunction with how InputSplits are generated (which relates
> to that code snippet with the cast you found) could be somewhat "spooky"
> and lead to weird results in some cases.
>
> The interface is a remnant of a very early version of the streaming API
> and should probably be removed soon. I hope we can find a better solution
> for your problem that fits better with Flink.
>
> Best,
> Aljoscha
>
> On 1. Nov 2018, at 15:30, Aaron Levin <aa...@stripe.com> wrote:
>
> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can
> provide any insight or advice, that would be helpful!
>
> Thanks again.
>
> Best,
>
> Aaron Levin
>
> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aa...@stripe.com>
> wrote:
>
>> Hey,
>>
>> Not sure how convo threading works on this list, so in case the folks
>> CC'd missed my other response, here's some more info:
>>
>> First, I appreciate everyone's help! Thank you!
>>
>> I wrote several wrappers to try and debug this, including one which is an
>> exact copy of `InputFormatSourceFunction` which also failed. They all
>> failed with the same error I detail above. I'll post two of them below.
>> They all extended `RichParallelSourceFunction` and, as far as I could tell,
>> were properly initialized (though I may have missed something!).
>> Additionally, for the two below, if I change `extends
>> RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`,
>> I no longer receive the exception. This is what led me to believe the
>> source of the issue was casting and how I found the line of code where the
>> stream graph is given the input format.
>>
>> Quick explanation of the wrappers:
>> 1. `WrappedInputFormat` does a basic wrap around
>> `InputFormatSourceFunction` and delegates all methods to the underlying
>> `InputFormatSourceFunction`
>> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the
>> `InputFormatSourceFunction` source.
>> 3. They're being used in a test which looks vaguely like:
>> `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new
>> InputFormatSourceFunction[String](source, implicitly[TypeInformation[Str
>> ing]]))).javaStream).asScala.toSeq`
>>
>> class WrappedInputFormat[A](
>>   inputFormat: InputFormatSourceFunction[A]
>> )(
>>   implicit typeInfo: TypeInformation[A]
>> ) extends RichParallelSourceFunction[A] {
>>
>>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit
>> = {
>>     inputFormat.run(sourceContext)
>>   }
>>   override def setRuntimeContext(t: RuntimeContext): Unit = {
>>     inputFormat.setRuntimeContext(t)
>>   }
>>   override def equals(obj: scala.Any) = {
>>     inputFormat.equals(obj)
>>   }
>>   override def hashCode() = { inputFormat.hashCode() }
>>   override def toString = { inputFormat.toString }
>>   override def getRuntimeContext(): RuntimeContext = {
>> inputFormat.getRuntimeContext }
>>   override def getIterationRuntimeContext = {
>> inputFormat.getIterationRuntimeContext }
>>   override def open(parameters: Configuration): Unit = {
>>     inputFormat.open(parameters)
>>   }
>>   override def cancel(): Unit = {
>>     inputFormat.cancel()
>>   }
>>   override def close(): Unit = {
>>     inputFormat.close()
>>   }
>> }
>>
>> And the other one:
>>
>> class ClonedInputFormatSourceFunction[A](val format: InputFormat[A,
>> InputSplit], val typeInfo: TypeInformation[A]) extends
>> RichParallelSourceFunction[A] {
>>
>>   @transient private var provider: InputSplitProvider = _
>>   @transient private var serializer: TypeSerializer[A] = _
>>   @transient private var splitIterator: Iterator[InputSplit] = _
>>   private var isRunning: Boolean = _
>>
>>   override def open(parameters: Configuration): Unit = {
>>     val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]
>>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>>       format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(
>> context)
>>     }
>>     format.configure(parameters)
>>
>>     provider = context.getInputSplitProvider
>>     serializer = typeInfo.createSerializer(getR
>> untimeContext.getExecutionConfig)
>>     splitIterator = getInputSplits()
>>     isRunning = splitIterator.hasNext
>>   }
>>
>>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit
>> = {
>>     if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
>>       format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
>>     }
>>
>>     var nextElement: A = serializer.createInstance()
>>     try {
>>       while (isRunning) {
>>         format.open(splitIterator.next())
>>         while (isRunning && !format.reachedEnd()) {
>>           nextElement = format.nextRecord(nextElement)
>>           if (nextElement != null) {
>>             sourceContext.collect(nextElement)
>>           } else {
>>             break
>>           }
>>           format.close()
>>           if (isRunning) {
>>             isRunning = splitIterator.hasNext
>>           }
>>         }
>>       }
>>     } finally {
>>
>>       format.close()
>>       if (format.isInstanceOf[RichInputFormat[_,_]]) {
>>         format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>>       }
>>       isRunning = false
>>     }
>>   }
>>
>>   override def cancel(): Unit = {
>>     isRunning = false
>>   }
>>
>>   override def close(): Unit = {
>>     format.close()
>>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>>       format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>>     }
>>   }
>>
>>   private def getInputSplits(): Iterator[InputSplit] = {
>>     new Iterator[InputSplit] {
>>       private var nextSplit: InputSplit = _
>>       private var exhausted: Boolean = _
>>
>>       override def hasNext: Boolean = {
>>         if(exhausted) { return false }
>>         if(nextSplit != null) { return true }
>>         var split: InputSplit = null
>>
>>         try {
>>           split = provider.getNextInputSplit(get
>> RuntimeContext.getUserCodeClassLoader)
>>         } catch {
>>           case e: InputSplitProviderException =>
>>             throw new RuntimeException("No InputSplit Provider", e)
>>         }
>>
>>         if(split != null) {
>>           nextSplit = split
>>           true
>>         } else {
>>           exhausted = true
>>           false
>>         }
>>       }
>>
>>       override def next(): InputSplit = {
>>         if(nextSplit == null && !hasNext) {
>>           throw new NoSuchElementException()
>>         }
>>         val tmp: InputSplit = nextSplit
>>         nextSplit = null
>>         tmp
>>       }
>>
>>     }
>>   }
>> }
>>
>>
>> On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <dwysakowicz@apache.org
>> > wrote:
>>
>>> Hi Aaron,
>>>
>>> Could you share the code of you custom function?
>>>
>>> I am also adding Aljosha and Kostas to cc, who should be more helpful on
>>> that topic.
>>>
>>> Best,
>>>
>>> Dawid
>>> On 19/10/2018 20:06, Aaron Levin wrote:
>>>
>>> Hi,
>>>
>>> I'm writing a custom `SourceFunction` which wraps an underlying
>>> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
>>> stream (via `env.addSource` and a subsequent sink) I get errors related to
>>> the `InputSplitAssigner` not being initialized for a particular vertex ID.
>>> Full error here[1].
>>>
>>> I believe the underlying error is related to this[0] call to `instanceof
>>> InputFormatSourceFunction`.
>>>
>>> *My questions*:
>>>
>>> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error?
>>> Am I missing a chunk of the API covering this?
>>> 2. is the error I'm experience related to that casting call? If so,
>>> would ya'll be open to a PR which adds an interface one can extend which
>>> will set the input format in the stream graph? Or is there a preferred way
>>> of achieving this?
>>>
>>> Thanks!
>>>
>>> Aaron Levin
>>>
>>> [0] https://github.com/apache/flink/blob/release-1.6/flink-s
>>> treaming-java/src/main/java/org/apache/flink/streaming/api/g
>>> raph/StreamGraphGenerator.java#L480
>>> [1]
>>> java.lang.RuntimeException: Could not retrieve next input split.
>>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>>> ourceFunction.open(InputFormatSourceFunction.java:71)
>>>     at REDACTED
>>>     at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>>> nFunction(FunctionUtils.java:36)
>>>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>>> erator.open(AbstractUdfStreamOperator.java:102)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>>> perators(StreamTask.java:424)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>> treamTask.java:290)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>>> Requesting the next input split failed.
>>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi
>>> der.getNextInputSplit(RpcInputSplitProvider.java:69)
>>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>>>     ... 8 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.Exception: No InputSplitAssigner for vertex ID
>>> cbc357ccb763df2852fee8c4fc7d55f2
>>>     at java.util.concurrent.CompletableFuture.reportGet(Completable
>>> Future.java:357)
>>>     at java.util.concurrent.CompletableFuture.get(CompletableFuture
>>> .java:1915)
>>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi
>>> der.getNextInputSplit(RpcInputSplitProvider.java:61)
>>>     ... 9 more
>>> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
>>> cbc357ccb763df2852fee8c4fc7d55f2
>>>     at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInpu
>>> tSplit(JobMaster.java:575)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
>>> cation(AkkaRpcActor.java:247)
>>> ...
>>>
>>>
>>
>
>

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Aaron,

I'l like to take a step back and understand why you're trying to wrap an InputFormatSourceFunction?

In my opinion, InputFormatSourceFunction should not be used because it has some shortcomings, the most prominent among them that it does not support checkpointing, i.e. in case of failure all data will (probably) be read again. I'm saying probably because the interaction of InputFormatSourceFunction with how InputSplits are generated (which relates to that code snippet with the cast you found) could be somewhat "spooky" and lead to weird results in some cases.

The interface is a remnant of a very early version of the streaming API and should probably be removed soon. I hope we can find a better solution for your problem that fits better with Flink.

Best,
Aljoscha

> On 1. Nov 2018, at 15:30, Aaron Levin <aa...@stripe.com> wrote:
> 
> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can provide any insight or advice, that would be helpful!
> 
> Thanks again.
> 
> Best,
> 
> Aaron Levin
> 
> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aaronlevin@stripe.com <ma...@stripe.com>> wrote:
> Hey,
> 
> Not sure how convo threading works on this list, so in case the folks CC'd missed my other response, here's some more info:
> 
> First, I appreciate everyone's help! Thank you! 
> 
> I wrote several wrappers to try and debug this, including one which is an exact copy of `InputFormatSourceFunction` which also failed. They all failed with the same error I detail above. I'll post two of them below. They all extended `RichParallelSourceFunction` and, as far as I could tell, were properly initialized (though I may have missed something!). Additionally, for the two below, if I change `extends RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`, I no longer receive the exception. This is what led me to believe the source of the issue was casting and how I found the line of code where the stream graph is given the input format.
> 
> Quick explanation of the wrappers:
> 1. `WrappedInputFormat` does a basic wrap around `InputFormatSourceFunction` and delegates all methods to the underlying `InputFormatSourceFunction`
> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the `InputFormatSourceFunction` source.
> 3. They're being used in a test which looks vaguely like: `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new InputFormatSourceFunction[String](source, implicitly[TypeInformation[String]]))).javaStream).asScala.toSeq`
> 
> class WrappedInputFormat[A](
>   inputFormat: InputFormatSourceFunction[A]
> )(
>   implicit typeInfo: TypeInformation[A]
> ) extends RichParallelSourceFunction[A] {
> 
>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
>     inputFormat.run(sourceContext)
>   }
>   override def setRuntimeContext(t: RuntimeContext): Unit = {
>     inputFormat.setRuntimeContext(t)
>   }
>   override def equals(obj: scala.Any) = {
>     inputFormat.equals(obj)
>   }
>   override def hashCode() = { inputFormat.hashCode() }
>   override def toString = { inputFormat.toString }
>   override def getRuntimeContext(): RuntimeContext = { inputFormat.getRuntimeContext }
>   override def getIterationRuntimeContext = { inputFormat.getIterationRuntimeContext }
>   override def open(parameters: Configuration): Unit = {
>     inputFormat.open(parameters)
>   }
>   override def cancel(): Unit = {
>     inputFormat.cancel()
>   }
>   override def close(): Unit = {
>     inputFormat.close()
>   }
> }
> 
> And the other one:
> 
> class ClonedInputFormatSourceFunction[A](val format: InputFormat[A, InputSplit], val typeInfo: TypeInformation[A]) extends RichParallelSourceFunction[A] {
> 
>   @transient private var provider: InputSplitProvider = _
>   @transient private var serializer: TypeSerializer[A] = _
>   @transient private var splitIterator: Iterator[InputSplit] = _
>   private var isRunning: Boolean = _
> 
>   override def open(parameters: Configuration): Unit = {
>     val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]
>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>       format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context)
>     }
>     format.configure(parameters)
> 
>     provider = context.getInputSplitProvider
>     serializer = typeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
>     splitIterator = getInputSplits()
>     isRunning = splitIterator.hasNext
>   }
> 
>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
>     if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
>       format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
>     }
> 
>     var nextElement: A = serializer.createInstance()
>     try {
>       while (isRunning) {
>         format.open(splitIterator.next())
>         while (isRunning && !format.reachedEnd()) {
>           nextElement = format.nextRecord(nextElement)
>           if (nextElement != null) {
>             sourceContext.collect(nextElement)
>           } else {
>             break
>           }
>           format.close()
>           if (isRunning) {
>             isRunning = splitIterator.hasNext
>           }
>         }
>       }
>     } finally {
> 
>       format.close()
>       if (format.isInstanceOf[RichInputFormat[_,_]]) {
>         format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>       }
>       isRunning = false
>     }
>   }
> 
>   override def cancel(): Unit = {
>     isRunning = false
>   }
> 
>   override def close(): Unit = {
>     format.close()
>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>       format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>     }
>   }
> 
>   private def getInputSplits(): Iterator[InputSplit] = {
>     new Iterator[InputSplit] {
>       private var nextSplit: InputSplit = _
>       private var exhausted: Boolean = _
> 
>       override def hasNext: Boolean = {
>         if(exhausted) { return false }
>         if(nextSplit != null) { return true }
>         var split: InputSplit = null
> 
>         try {
>           split = provider.getNextInputSplit(getRuntimeContext.getUserCodeClassLoader)
>         } catch {
>           case e: InputSplitProviderException =>
>             throw new RuntimeException("No InputSplit Provider", e)
>         }
> 
>         if(split != null) {
>           nextSplit = split
>           true
>         } else {
>           exhausted = true
>           false
>         }
>       }
> 
>       override def next(): InputSplit = {
>         if(nextSplit == null && !hasNext) {
>           throw new NoSuchElementException()
>         }
>         val tmp: InputSplit = nextSplit
>         nextSplit = null
>         tmp
>       }
> 
>     }
>   }
> }
> 
> 
> On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <dwysakowicz@apache.org <ma...@apache.org>> wrote:
> Hi Aaron,
> 
> Could you share the code of you custom function?
> 
> I am also adding Aljosha and Kostas to cc, who should be more helpful on that topic.
> 
> Best,
> 
> Dawid
> 
> On 19/10/2018 20:06, Aaron Levin wrote:
>> Hi,
>> 
>> I'm writing a custom `SourceFunction` which wraps an underlying `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a stream (via `env.addSource` and a subsequent sink) I get errors related to the `InputSplitAssigner` not being initialized for a particular vertex ID. Full error here[1].
>> 
>> I believe the underlying error is related to this[0] call to `instanceof InputFormatSourceFunction`.
>> 
>> My questions:
>> 
>> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error? Am I missing a chunk of the API covering this?
>> 2. is the error I'm experience related to that casting call? If so, would ya'll be open to a PR which adds an interface one can extend which will set the input format in the stream graph? Or is there a preferred way of achieving this?
>> 
>> Thanks!
>> 
>> Aaron Levin
>> 
>> [0] https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480 <https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480>
>> [1] 
>> java.lang.RuntimeException: Could not retrieve next input split.
>>     at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>>     at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
>>     at REDACTED
>>     at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>>     at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>>     ... 8 more
>> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
>>     at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>>     ... 9 more
>> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
>>     at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>> ...
> 
> 


Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

Posted by Aaron Levin <aa...@stripe.com>.
Hey Friends! Last ping and I'll move this over to a ticket. If anyone can
provide any insight or advice, that would be helpful!

Thanks again.

Best,

Aaron Levin

On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aa...@stripe.com> wrote:

> Hey,
>
> Not sure how convo threading works on this list, so in case the folks CC'd
> missed my other response, here's some more info:
>
> First, I appreciate everyone's help! Thank you!
>
> I wrote several wrappers to try and debug this, including one which is an
> exact copy of `InputFormatSourceFunction` which also failed. They all
> failed with the same error I detail above. I'll post two of them below.
> They all extended `RichParallelSourceFunction` and, as far as I could tell,
> were properly initialized (though I may have missed something!).
> Additionally, for the two below, if I change `extends
> RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`,
> I no longer receive the exception. This is what led me to believe the
> source of the issue was casting and how I found the line of code where the
> stream graph is given the input format.
>
> Quick explanation of the wrappers:
> 1. `WrappedInputFormat` does a basic wrap around
> `InputFormatSourceFunction` and delegates all methods to the underlying
> `InputFormatSourceFunction`
> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the
> `InputFormatSourceFunction` source.
> 3. They're being used in a test which looks vaguely like:
> `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new
> InputFormatSourceFunction[String](source, implicitly[TypeInformation[Str
> ing]]))).javaStream).asScala.toSeq`
>
> class WrappedInputFormat[A](
>   inputFormat: InputFormatSourceFunction[A]
> )(
>   implicit typeInfo: TypeInformation[A]
> ) extends RichParallelSourceFunction[A] {
>
>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit
> = {
>     inputFormat.run(sourceContext)
>   }
>   override def setRuntimeContext(t: RuntimeContext): Unit = {
>     inputFormat.setRuntimeContext(t)
>   }
>   override def equals(obj: scala.Any) = {
>     inputFormat.equals(obj)
>   }
>   override def hashCode() = { inputFormat.hashCode() }
>   override def toString = { inputFormat.toString }
>   override def getRuntimeContext(): RuntimeContext = {
> inputFormat.getRuntimeContext }
>   override def getIterationRuntimeContext = {
> inputFormat.getIterationRuntimeContext }
>   override def open(parameters: Configuration): Unit = {
>     inputFormat.open(parameters)
>   }
>   override def cancel(): Unit = {
>     inputFormat.cancel()
>   }
>   override def close(): Unit = {
>     inputFormat.close()
>   }
> }
>
> And the other one:
>
> class ClonedInputFormatSourceFunction[A](val format: InputFormat[A,
> InputSplit], val typeInfo: TypeInformation[A]) extends
> RichParallelSourceFunction[A] {
>
>   @transient private var provider: InputSplitProvider = _
>   @transient private var serializer: TypeSerializer[A] = _
>   @transient private var splitIterator: Iterator[InputSplit] = _
>   private var isRunning: Boolean = _
>
>   override def open(parameters: Configuration): Unit = {
>     val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]
>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>       format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context)
>     }
>     format.configure(parameters)
>
>     provider = context.getInputSplitProvider
>     serializer = typeInfo.createSerializer(getR
> untimeContext.getExecutionConfig)
>     splitIterator = getInputSplits()
>     isRunning = splitIterator.hasNext
>   }
>
>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit
> = {
>     if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
>       format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
>     }
>
>     var nextElement: A = serializer.createInstance()
>     try {
>       while (isRunning) {
>         format.open(splitIterator.next())
>         while (isRunning && !format.reachedEnd()) {
>           nextElement = format.nextRecord(nextElement)
>           if (nextElement != null) {
>             sourceContext.collect(nextElement)
>           } else {
>             break
>           }
>           format.close()
>           if (isRunning) {
>             isRunning = splitIterator.hasNext
>           }
>         }
>       }
>     } finally {
>
>       format.close()
>       if (format.isInstanceOf[RichInputFormat[_,_]]) {
>         format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>       }
>       isRunning = false
>     }
>   }
>
>   override def cancel(): Unit = {
>     isRunning = false
>   }
>
>   override def close(): Unit = {
>     format.close()
>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>       format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>     }
>   }
>
>   private def getInputSplits(): Iterator[InputSplit] = {
>     new Iterator[InputSplit] {
>       private var nextSplit: InputSplit = _
>       private var exhausted: Boolean = _
>
>       override def hasNext: Boolean = {
>         if(exhausted) { return false }
>         if(nextSplit != null) { return true }
>         var split: InputSplit = null
>
>         try {
>           split = provider.getNextInputSplit(get
> RuntimeContext.getUserCodeClassLoader)
>         } catch {
>           case e: InputSplitProviderException =>
>             throw new RuntimeException("No InputSplit Provider", e)
>         }
>
>         if(split != null) {
>           nextSplit = split
>           true
>         } else {
>           exhausted = true
>           false
>         }
>       }
>
>       override def next(): InputSplit = {
>         if(nextSplit == null && !hasNext) {
>           throw new NoSuchElementException()
>         }
>         val tmp: InputSplit = nextSplit
>         nextSplit = null
>         tmp
>       }
>
>     }
>   }
> }
>
>
> On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> Hi Aaron,
>>
>> Could you share the code of you custom function?
>>
>> I am also adding Aljosha and Kostas to cc, who should be more helpful on
>> that topic.
>>
>> Best,
>>
>> Dawid
>> On 19/10/2018 20:06, Aaron Levin wrote:
>>
>> Hi,
>>
>> I'm writing a custom `SourceFunction` which wraps an underlying
>> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
>> stream (via `env.addSource` and a subsequent sink) I get errors related to
>> the `InputSplitAssigner` not being initialized for a particular vertex ID.
>> Full error here[1].
>>
>> I believe the underlying error is related to this[0] call to `instanceof
>> InputFormatSourceFunction`.
>>
>> *My questions*:
>>
>> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error?
>> Am I missing a chunk of the API covering this?
>> 2. is the error I'm experience related to that casting call? If so, would
>> ya'll be open to a PR which adds an interface one can extend which will set
>> the input format in the stream graph? Or is there a preferred way of
>> achieving this?
>>
>> Thanks!
>>
>> Aaron Levin
>>
>> [0] https://github.com/apache/flink/blob/release-1.6/flink-s
>> treaming-java/src/main/java/org/apache/flink/streaming/api/
>> graph/StreamGraphGenerator.java#L480
>> [1]
>> java.lang.RuntimeException: Could not retrieve next input split.
>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>> ourceFunction.open(InputFormatSourceFunction.java:71)
>>     at REDACTED
>>     at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:36)
>>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:102)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:424)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:290)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>> Requesting the next input split failed.
>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi
>> der.getNextInputSplit(RpcInputSplitProvider.java:69)
>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>>     ... 8 more
>> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
>> No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
>>     at java.util.concurrent.CompletableFuture.reportGet(Completable
>> Future.java:357)
>>     at java.util.concurrent.CompletableFuture.get(CompletableFuture
>> .java:1915)
>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi
>> der.getNextInputSplit(RpcInputSplitProvider.java:61)
>>     ... 9 more
>> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
>> cbc357ccb763df2852fee8c4fc7d55f2
>>     at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInpu
>> tSplit(JobMaster.java:575)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
>> cation(AkkaRpcActor.java:247)
>> ...
>>
>>
>

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

Posted by Aaron Levin <aa...@stripe.com>.
Hey,

Not sure how convo threading works on this list, so in case the folks CC'd
missed my other response, here's some more info:

First, I appreciate everyone's help! Thank you!

I wrote several wrappers to try and debug this, including one which is an
exact copy of `InputFormatSourceFunction` which also failed. They all
failed with the same error I detail above. I'll post two of them below.
They all extended `RichParallelSourceFunction` and, as far as I could tell,
were properly initialized (though I may have missed something!).
Additionally, for the two below, if I change `extends
RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`,
I no longer receive the exception. This is what led me to believe the
source of the issue was casting and how I found the line of code where the
stream graph is given the input format.

Quick explanation of the wrappers:
1. `WrappedInputFormat` does a basic wrap around
`InputFormatSourceFunction` and delegates all methods to the underlying
`InputFormatSourceFunction`
2. `ClonedInputFormatSourceFunction` is a ~exact copy of the
`InputFormatSourceFunction` source.
3. They're being used in a test which looks vaguely like:
`DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new
InputFormatSourceFunction[String](source, implicitly[TypeInformation[
String]]))).javaStream).asScala.toSeq`

class WrappedInputFormat[A](
  inputFormat: InputFormatSourceFunction[A]
)(
  implicit typeInfo: TypeInformation[A]
) extends RichParallelSourceFunction[A] {

  override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
    inputFormat.run(sourceContext)
  }
  override def setRuntimeContext(t: RuntimeContext): Unit = {
    inputFormat.setRuntimeContext(t)
  }
  override def equals(obj: scala.Any) = {
    inputFormat.equals(obj)
  }
  override def hashCode() = { inputFormat.hashCode() }
  override def toString = { inputFormat.toString }
  override def getRuntimeContext(): RuntimeContext = {
inputFormat.getRuntimeContext }
  override def getIterationRuntimeContext = {
inputFormat.getIterationRuntimeContext
}
  override def open(parameters: Configuration): Unit = {
    inputFormat.open(parameters)
  }
  override def cancel(): Unit = {
    inputFormat.cancel()
  }
  override def close(): Unit = {
    inputFormat.close()
  }
}

And the other one:

class ClonedInputFormatSourceFunction[A](val format: InputFormat[A,
InputSplit], val typeInfo: TypeInformation[A]) extends
RichParallelSourceFunction[A] {

  @transient private var provider: InputSplitProvider = _
  @transient private var serializer: TypeSerializer[A] = _
  @transient private var splitIterator: Iterator[InputSplit] = _
  private var isRunning: Boolean = _

  override def open(parameters: Configuration): Unit = {
    val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]
    if(format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context)
    }
    format.configure(parameters)

    provider = context.getInputSplitProvider
    serializer = typeInfo.createSerializer(getRuntimeContext.
getExecutionConfig)
    splitIterator = getInputSplits()
    isRunning = splitIterator.hasNext
  }

  override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
    if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
    }

    var nextElement: A = serializer.createInstance()
    try {
      while (isRunning) {
        format.open(splitIterator.next())
        while (isRunning && !format.reachedEnd()) {
          nextElement = format.nextRecord(nextElement)
          if (nextElement != null) {
            sourceContext.collect(nextElement)
          } else {
            break
          }
          format.close()
          if (isRunning) {
            isRunning = splitIterator.hasNext
          }
        }
      }
    } finally {

      format.close()
      if (format.isInstanceOf[RichInputFormat[_,_]]) {
        format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
      }
      isRunning = false
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }

  override def close(): Unit = {
    format.close()
    if(format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
    }
  }

  private def getInputSplits(): Iterator[InputSplit] = {
    new Iterator[InputSplit] {
      private var nextSplit: InputSplit = _
      private var exhausted: Boolean = _

      override def hasNext: Boolean = {
        if(exhausted) { return false }
        if(nextSplit != null) { return true }
        var split: InputSplit = null

        try {
          split = provider.getNextInputSplit(getRuntimeContext.
getUserCodeClassLoader)
        } catch {
          case e: InputSplitProviderException =>
            throw new RuntimeException("No InputSplit Provider", e)
        }

        if(split != null) {
          nextSplit = split
          true
        } else {
          exhausted = true
          false
        }
      }

      override def next(): InputSplit = {
        if(nextSplit == null && !hasNext) {
          throw new NoSuchElementException()
        }
        val tmp: InputSplit = nextSplit
        nextSplit = null
        tmp
      }

    }
  }
}


On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hi Aaron,
>
> Could you share the code of you custom function?
>
> I am also adding Aljosha and Kostas to cc, who should be more helpful on
> that topic.
>
> Best,
>
> Dawid
> On 19/10/2018 20:06, Aaron Levin wrote:
>
> Hi,
>
> I'm writing a custom `SourceFunction` which wraps an underlying
> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
> stream (via `env.addSource` and a subsequent sink) I get errors related to
> the `InputSplitAssigner` not being initialized for a particular vertex ID.
> Full error here[1].
>
> I believe the underlying error is related to this[0] call to `instanceof
> InputFormatSourceFunction`.
>
> *My questions*:
>
> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error?
> Am I missing a chunk of the API covering this?
> 2. is the error I'm experience related to that casting call? If so, would
> ya'll be open to a PR which adds an interface one can extend which will set
> the input format in the stream graph? Or is there a preferred way of
> achieving this?
>
> Thanks!
>
> Aaron Levin
>
> [0] https://github.com/apache/flink/blob/release-1.6/flink-
> streaming-java/src/main/java/org/apache/flink/streaming/api/graph/
> StreamGraphGenerator.java#L480
> [1]
> java.lang.RuntimeException: Could not retrieve next input split.
>     at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>     at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
>     at REDACTED
>     at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:102)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:424)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:290)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
> Requesting the next input split failed.
>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.
> getNextInputSplit(RpcInputSplitProvider.java:69)
>     at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>     ... 8 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
> No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
>     at java.util.concurrent.CompletableFuture.reportGet(
> CompletableFuture.java:357)
>     at java.util.concurrent.CompletableFuture.get(
> CompletableFuture.java:1915)
>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.
> getNextInputSplit(RpcInputSplitProvider.java:61)
>     ... 9 more
> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
> cbc357ccb763df2852fee8c4fc7d55f2
>     at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(
> JobMaster.java:575)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:247)
> ...
>
>

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Aaron,

Could you share the code of you custom function?

I am also adding Aljosha and Kostas to cc, who should be more helpful on
that topic.

Best,

Dawid

On 19/10/2018 20:06, Aaron Levin wrote:
> Hi,
>
> I'm writing a custom `SourceFunction` which wraps an underlying
> `InputFormatSourceFunction`. When I try to use this `SourceFunction`
> in a stream (via `env.addSource` and a subsequent sink) I get errors
> related to the `InputSplitAssigner` not being initialized for a
> particular vertex ID. Full error here[1].
>
> I believe the underlying error is related to this[0] call to
> `instanceof InputFormatSourceFunction`.
>
> _My questions_:
>
> 1. how can I wrap a `InputFormatSourceFunction` which avoids this
> error? Am I missing a chunk of the API covering this?
> 2. is the error I'm experience related to that casting call? If so,
> would ya'll be open to a PR which adds an interface one can extend
> which will set the input format in the stream graph? Or is there a
> preferred way of achieving this?
>
> Thanks!
>
> Aaron Levin
>
> [0] https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480
> [1] 
> java.lang.RuntimeException: Could not retrieve next input split.
>     at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>     at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
>     at REDACTED
>     at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>     at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by:
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
> Requesting the next input split failed.
>     at
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>     at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>     ... 8 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.Exception: No InputSplitAssigner for vertex ID
> cbc357ccb763df2852fee8c4fc7d55f2
>     at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>     at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>     at
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>     ... 9 more
> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
> cbc357ccb763df2852fee8c4fc7d55f2
>     at
> org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
> ...