You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@batchee.apache.org by "Christian Berger (Jira)" <ji...@apache.org> on 2020/03/11 15:05:00 UTC
[jira] [Created] (BATCHEE-138) Failure in Read-Process-Write Loop:
Somehow one of the metrics was zero
Christian Berger created BATCHEE-138:
----------------------------------------
Summary: Failure in Read-Process-Write Loop: Somehow one of the metrics was zero
Key: BATCHEE-138
URL: https://issues.apache.org/jira/browse/BATCHEE-138
Project: BatchEE
Issue Type: Bug
Components: jbatch-core
Affects Versions: 0.5-incubating
Reporter: Christian Berger
Hi BatchEE-Team,
I'm not able to stop a chunk processing step via JobOperator.stop() properly. I always get an exception that some metrics are wrong:
{code:java}
2020-03-10 15:42:34.668 CET ; [5397] ; [AN72] ; [SEVERE] ; an72.org.apache.batchee.container.impl.controller.chunk.ChunkStepController ; Failure in Read-Process-Write Loop
java.lang.IllegalStateException: Somehow one of the metrics was zero. Read count: 56, Filter count: -1, Write count: 57
at org.apache.batchee.container.impl.controller.chunk.ChunkStepController.updateNormalMetrics(ChunkStepController.java:650)
at org.apache.batchee.container.impl.controller.chunk.ChunkStepController.invokeChunk(ChunkStepController.java:613)
at org.apache.batchee.container.impl.controller.chunk.ChunkStepController.invokeCoreStep(ChunkStepController.java:732)
at org.apache.batchee.container.impl.controller.BaseStepController.execute(BaseStepController.java:157)
at org.apache.batchee.container.impl.controller.ExecutionTransitioner.doExecutionLoop(ExecutionTransitioner.java:106)
at org.apache.batchee.container.impl.controller.JobThreadRootController.originateExecutionOnThread(JobThreadRootController.java:110)
at org.apache.batchee.container.util.BatchWorkUnit.run(BatchWorkUnit.java:62){code}
After investigating the code, I've found the root cause. Anyway, let's understand the request flow first:
Related class: ChunkStepController
At the beginning, the invokeChunk method is being called which then calls the readAndProcess method ...
{code:java}
private void invokeChunk() {
....
while (true) {
..
final List<Object> chunkToWrite = readAndProcess();
...
}
{code}
This code increments the read counter at the very beginning although it might be possible that no further read items are available.
{code:java}
private List<Object> readAndProcess() {
List<Object> chunkToWrite = new ArrayList<Object>();
Object itemRead;
Object itemProcessed;
int readProcessedCount = 0;
while (true) {
currentItemStatus = new SingleItemStatus();
currentChunkStatus.incrementItemsTouchedInCurrentChunk(); //inc read counter
itemRead = readItem(); // now we call the readItem method
..
}
}{code}
Let's assume no items are left. Then the readItem method will mark the currentChunkStatus as finished:
{code:java}
t
private Object readItem() {
Object itemRead = null;
...
itemRead = readerProxy.readItem(); // this returns null in our case
...
// itemRead == null means we reached the end of
// the readerProxy "resultset"
currentChunkStatus.setFinished(itemRead == null); // so we mark the chunk as finished
{code}
Thus we will return to invokeChunk() where we call updateNormalMetrics(chunkToWrite.size()); after some time:
Here we decrease the read counter again, when the chunk is marked as finished. Obviously, that's because we increased it before but we did not ready an item anymore:
{code:java}
private void updateNormalMetrics(int writeCount) {
int readCount = currentChunkStatus.getItemsTouchedInCurrentChunk();
if (currentChunkStatus.isFinished()) {
readCount--;
}
int filterCount = readCount - writeCount;
if (readCount < 0 || filterCount < 0 || writeCount < 0) {
throw new IllegalStateException("Somehow one of the metrics was zero. Read count: " + readCount +
", Filter count: " + filterCount + ", Write count: " + writeCount);
}
{code}
So far so good.
Now let's assume the chunk step processing does not complete properly but it gets stopped via Job-Operator invocation. In that case the readAndProcess method marks the chunk as finished as well:
{code:java}
private List<Object> readAndProcess() {
...
while (true) {
currentItemStatus = new SingleItemStatus();
currentChunkStatus.incrementItemsTouchedInCurrentChunk();
itemRead = readItem();
...
// write buffer size reached
// This will force the current item to finish processing on a stop request
if (stepContext.getBatchStatus().equals(BatchStatus.STOPPING)) {
currentChunkStatus.setFinished(true);
}
...
// last record in readerProxy reached
if (currentChunkStatus.isFinished()) { //it's not the last record but we stopped the processing
break;
}
{code}
Now we return back to invokeChunk() where we call updateNormalMetrics(chunkToWrite.size()) again. However, this time - it's not the last record - the decrease operation makes no sense and we fail with the given exception:
{code:java}
private void updateNormalMetrics(int writeCount) {
int readCount = currentChunkStatus.getItemsTouchedInCurrentChunk();
if (currentChunkStatus.isFinished()) {
readCount--;
}
int filterCount = readCount - writeCount;
if (readCount < 0 || filterCount < 0 || writeCount < 0) {
throw new IllegalStateException("Somehow one of the metrics was zero. Read count: " + readCount +
", Filter count: " + filterCount + ", Write count: " + writeCount);
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)