You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mark Davis <mo...@protonmail.com> on 2019/11/23 22:07:40 UTC

DataSet API: HBase ScannerTimeoutException and double Result processing

Hello,

I am reading Results from an HBase table and process them with Batch API. Everything works fine until I receive a ScannerTimeoutException from HBase.
Maybe my transformations get stuck or a GC pause happen - hard to tell. The HBase Client restarts the scan and the processing continues.
Except one problem - every time I receive this Exception I observe a duplicate Result processing - the Result which was processed just before ScannerTimeoutException is thrown is processed twice.

Is this expected behavior? Should I be prepared to handle it?
And how should I handle it? Keeping track of all processed Results is not feasible in my case.

Here is a simple job demonstrating an issue (HBase scan and RPC timeouts are set to 60 sec)

Thank you!

Best regards,
Mark

  public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env.createInput(new Src())
        .map(new Mapper())
        .print();
  }

  private static class Mapper implements MapFunction<Tuple1<String>, String> {

    private int cnt = 0;

    @Override
    public String map(Tuple1<String> value) throws Exception {
      if (cnt++ % 2 == 0) {
        Thread.sleep(120000);
      }
      return value.f0;
    }

  }

  private static class Src extends AbstractTableInputFormat<Tuple1<String>> {

    @Override
    protected Scan getScanner() {
      Scan scan = new Scan();
      scan.setStartRow(getStartRow());
      scan.setStopRow(getEndRow());
      scan.setCaching(1);
      scan.setCacheBlocks(false);
      return scan;
    }

    @Override
    protected String getTableName() {
      return getTable();
    }

    @Override
    protected Tuple1<String> mapResultToOutType(Result r) {
      return new Tuple1<String>(Bytes.toString(r.getRow()));
    }

    @Override
    public void configure(org.apache.flink.configuration.Configuration parameters) {
      scan = getScanner();
      try {
        table = new HTable(getHadoopConf(), getTableName());
      } catch (IOException e) {
        e.printStackTrace();
      }
    }

  }

Re: DataSet API: HBase ScannerTimeoutException and double Result processing

Posted by OpenInx <op...@gmail.com>.
> If the call to mapResultToOutType(Result) finished without an error there
is no need to restart from the same row.
> The new scanner should start from the next row.
> Is that so or am I missing something?

Yeah, your are right. I've filed the issue
https://issues.apache.org/jira/browse/FLINK-14941 to address this bug.
Thanks.


On Mon, Nov 25, 2019 at 6:57 PM Mark Davis <mo...@protonmail.com> wrote:

> Hi Flavio,
>
>
> When the resultScanner dies because of a timeout (this happens a lot when
>> you have backpressure and the time between 2 consecutive reads exceed the
>> scanner timeout), the code creates a new scanner and restart from where it
>> was (starRow = currentRow).
>> So there should not be any duplicates (in theory), but this could be the
>> root of the problem..
>>
>
> Yes, you are right, the nextRecord() exception handling is responsible for
> the duplicate record processing:
>
> org.apache.hadoop.hbase.client.ScannerTimeoutException: 1038878ms passed
> since the last invocation, timeout is currently set to 60000
> at
> org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:453)
> at
> org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:371)
> at
> org.apache.flink.addons.hbase.AbstractTableInputFormat.nextRecord(AbstractTableInputFormat.java:130)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hadoop.hbase.UnknownScannerException:
> org.apache.hadoop.hbase.UnknownScannerException: Name: 135281, already
> closed?
> at
> org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2389)
> at
> org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385)
> at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150)
> at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
> at
> org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
> at
> org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)
>
> But I am not sure that the handling of the HBase exception thrown from
> ClientScanner.next() is correct.
> If the call to mapResultToOutType(Result) finished without an error there
> is no need to restart from the same row.
> The new scanner should start from the next row.
> Is that so or am I missing something?
>
> Best regards,
>   Mark
>
>

Re: DataSet API: HBase ScannerTimeoutException and double Result processing

Posted by Mark Davis <mo...@protonmail.com>.
Hi Flavio,

>> When the resultScanner dies because of a timeout (this happens a lot when you have backpressure and the time between 2 consecutive reads exceed the scanner timeout), the code creates a new scanner and restart from where it was (starRow = currentRow).
>> So there should not be any duplicates (in theory), but this could be the root of the problem..

Yes, you are right, the nextRecord() exception handling is responsible for the duplicate record processing:

org.apache.hadoop.hbase.client.ScannerTimeoutException: 1038878ms passed since the last invocation, timeout is currently set to 60000
at org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:453)
at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:371)
at org.apache.flink.addons.hbase.AbstractTableInputFormat.nextRecord(AbstractTableInputFormat.java:130)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.hbase.UnknownScannerException: org.apache.hadoop.hbase.UnknownScannerException: Name: 135281, already closed?
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2389)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)

But I am not sure that the handling of the HBase exception thrown from ClientScanner.next() is correct.
If the call to mapResultToOutType(Result) finished without an error there is no need to restart from the same row.
The new scanner should start from the next row.
Is that so or am I missing something?

Best regards,
  Mark

Re: DataSet API: HBase ScannerTimeoutException and double Result processing

Posted by Flavio Pompermaier <po...@okkam.it>.
Maybe the problem is indeed this..the fact that the scan starts from the
last seen row..in this case maybe the first result should be skipped
because it was already read..

On Mon, Nov 25, 2019 at 10:22 AM Flavio Pompermaier <po...@okkam.it>
wrote:

> What I can tell is how the HBase input format works..if you look
> at AbstractTableInputFormat [1] this is the nextRecord() function:
>
>         public T nextRecord(T reuse) throws IOException {
> 		if (resultScanner == null) {
> 			throw new IOException("No table result scanner provided!");
> 		}
> 		try {
> 			Result res = resultScanner.next();
> 			if (res != null) {
> 				scannedRows++;
> 				currentRow = res.getRow();
> 				return mapResultToOutType(res);
> 			}
> 		} catch (Exception e) {
> 			resultScanner.close();
> 			//workaround for timeout on scan
> 			LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
> 			scan.setStartRow(currentRow);
> 			resultScanner = table.getScanner(scan);
> 			Result res = resultScanner.next();
> 			if (res != null) {
> 				scannedRows++;
> 				currentRow = res.getRow();
> 				return mapResultToOutType(res);
> 			}
> 		}
>
> 		endReached = true;
> 		return null;
> 	}
>
> When the resultScanner dies because of a timeout (this happens a lot when
> you have backpressure and the time between 2 consecutive reads exceed the
> scanner timeout), the code creates a new scanner and restart from where it
> was (starRow = currentRow).
> So there should not be any duplicates (in theory), but this could be the
> root of the problem..
>
> Best,
> Flavio
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java
>
> On Sat, Nov 23, 2019 at 11:07 PM Mark Davis <mo...@protonmail.com>
> wrote:
>
>> Hello,
>>
>> I am reading Results from an HBase table and process them with Batch API.
>> Everything works fine until I receive a ScannerTimeoutException from HBase.
>> Maybe my transformations get stuck or a GC pause happen - hard to tell.
>> The HBase Client restarts the scan and the processing continues.
>> Except one problem - every time I receive this Exception I observe a
>> duplicate Result processing - the Result which was processed just before
>> ScannerTimeoutException is thrown is processed twice.
>>
>> Is this expected behavior? Should I be prepared to handle it?
>> And how should I handle it? Keeping track of all processed Results is not
>> feasible in my case.
>>
>> Here is a simple job demonstrating an issue (HBase scan and RPC timeouts
>> are set to 60 sec)
>>
>> Thank you!
>>
>> Best regards,
>> Mark
>>
>>
>>   public static void main(String[] args) throws Exception {
>>     ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>     env.setParallelism(1);
>>
>>     env.createInput(new Src())
>>         .map(new Mapper())
>>         .print();
>>   }
>>
>>   private static class Mapper implements MapFunction<Tuple1<String>,
>> String> {
>>
>>     private int cnt = 0;
>>
>>     @Override
>>     public String map(Tuple1<String> value) throws Exception {
>>       if (cnt++ % 2 == 0) {
>>         Thread.sleep(120000);
>>       }
>>       return value.f0;
>>     }
>>
>>   }
>>
>>   private static class Src extends
>> AbstractTableInputFormat<Tuple1<String>> {
>>
>>     @Override
>>     protected Scan getScanner() {
>>       Scan scan = new Scan();
>>       scan.setStartRow(getStartRow());
>>       scan.setStopRow(getEndRow());
>>       scan.setCaching(1);
>>       scan.setCacheBlocks(false);
>>       return scan;
>>     }
>>
>>     @Override
>>     protected String getTableName() {
>>       return getTable();
>>     }
>>
>>     @Override
>>     protected Tuple1<String> mapResultToOutType(Result r) {
>>       return new Tuple1<String>(Bytes.toString(r.getRow()));
>>     }
>>
>>     @Override
>>     public void configure(org.apache.flink.configuration.Configuration
>> parameters) {
>>       scan = getScanner();
>>       try {
>>         table = new HTable(getHadoopConf(), getTableName());
>>       } catch (IOException e) {
>>         e.printStackTrace();
>>       }
>>     }
>>
>>   }
>>
>
>

Re: DataSet API: HBase ScannerTimeoutException and double Result processing

Posted by Flavio Pompermaier <po...@okkam.it>.
What I can tell is how the HBase input format works..if you look
at AbstractTableInputFormat [1] this is the nextRecord() function:

        public T nextRecord(T reuse) throws IOException {
		if (resultScanner == null) {
			throw new IOException("No table result scanner provided!");
		}
		try {
			Result res = resultScanner.next();
			if (res != null) {
				scannedRows++;
				currentRow = res.getRow();
				return mapResultToOutType(res);
			}
		} catch (Exception e) {
			resultScanner.close();
			//workaround for timeout on scan
			LOG.warn("Error after scan of " + scannedRows + " rows. Retry with
a new scanner...", e);
			scan.setStartRow(currentRow);
			resultScanner = table.getScanner(scan);
			Result res = resultScanner.next();
			if (res != null) {
				scannedRows++;
				currentRow = res.getRow();
				return mapResultToOutType(res);
			}
		}

		endReached = true;
		return null;
	}

When the resultScanner dies because of a timeout (this happens a lot when
you have backpressure and the time between 2 consecutive reads exceed the
scanner timeout), the code creates a new scanner and restart from where it
was (starRow = currentRow).
So there should not be any duplicates (in theory), but this could be the
root of the problem..

Best,
Flavio

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/AbstractTableInputFormat.java

On Sat, Nov 23, 2019 at 11:07 PM Mark Davis <mo...@protonmail.com> wrote:

> Hello,
>
> I am reading Results from an HBase table and process them with Batch API.
> Everything works fine until I receive a ScannerTimeoutException from HBase.
> Maybe my transformations get stuck or a GC pause happen - hard to tell.
> The HBase Client restarts the scan and the processing continues.
> Except one problem - every time I receive this Exception I observe a
> duplicate Result processing - the Result which was processed just before
> ScannerTimeoutException is thrown is processed twice.
>
> Is this expected behavior? Should I be prepared to handle it?
> And how should I handle it? Keeping track of all processed Results is not
> feasible in my case.
>
> Here is a simple job demonstrating an issue (HBase scan and RPC timeouts
> are set to 60 sec)
>
> Thank you!
>
> Best regards,
> Mark
>
>
>   public static void main(String[] args) throws Exception {
>     ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(1);
>
>     env.createInput(new Src())
>         .map(new Mapper())
>         .print();
>   }
>
>   private static class Mapper implements MapFunction<Tuple1<String>,
> String> {
>
>     private int cnt = 0;
>
>     @Override
>     public String map(Tuple1<String> value) throws Exception {
>       if (cnt++ % 2 == 0) {
>         Thread.sleep(120000);
>       }
>       return value.f0;
>     }
>
>   }
>
>   private static class Src extends
> AbstractTableInputFormat<Tuple1<String>> {
>
>     @Override
>     protected Scan getScanner() {
>       Scan scan = new Scan();
>       scan.setStartRow(getStartRow());
>       scan.setStopRow(getEndRow());
>       scan.setCaching(1);
>       scan.setCacheBlocks(false);
>       return scan;
>     }
>
>     @Override
>     protected String getTableName() {
>       return getTable();
>     }
>
>     @Override
>     protected Tuple1<String> mapResultToOutType(Result r) {
>       return new Tuple1<String>(Bytes.toString(r.getRow()));
>     }
>
>     @Override
>     public void configure(org.apache.flink.configuration.Configuration
> parameters) {
>       scan = getScanner();
>       try {
>         table = new HTable(getHadoopConf(), getTableName());
>       } catch (IOException e) {
>         e.printStackTrace();
>       }
>     }
>
>   }
>