You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/11/27 15:18:31 UTC
[21/28] phoenix git commit: PHOENIX-4955 -
PhoenixIndexImportDirectMapper undercounts failed records
PHOENIX-4955 - PhoenixIndexImportDirectMapper undercounts failed records
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/dd81989f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/dd81989f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/dd81989f
Branch: refs/heads/4.x-cdh5.15
Commit: dd81989fab80cb283678218ada0c0359930731c8
Parents: 590f88b
Author: Geoffrey Jacoby <gj...@apache.org>
Authored: Fri Nov 16 21:57:45 2018 +0000
Committer: Pedro Boado <pb...@apache.org>
Committed: Tue Nov 27 15:12:05 2018 +0000
----------------------------------------------------------------------
.../mapreduce/index/PhoenixIndexImportDirectMapper.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/dd81989f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index eb4bc0e..e2ac491 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -68,6 +68,8 @@ public class PhoenixIndexImportDirectMapper extends
private long batchSizeBytes;
private MutationState mutationState;
+ private int currentBatchCount = 0;
+
@Override
protected void setup(final Context context) throws IOException, InterruptedException {
@@ -113,6 +115,7 @@ public class PhoenixIndexImportDirectMapper extends
throws IOException, InterruptedException {
try {
+ currentBatchCount++;
final List<Object> values = record.getValues();
indxWritable.setValues(values);
indxWritable.write(this.pStatement);
@@ -125,9 +128,8 @@ public class PhoenixIndexImportDirectMapper extends
}
// Keep accumulating Mutations till batch size
mutationState.join(currentMutationState);
-
// Write Mutation Batch
- if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize == 0) {
+ if (currentBatchCount % batchSize == 0) {
writeBatch(mutationState, context);
mutationState = null;
}
@@ -136,7 +138,7 @@ public class PhoenixIndexImportDirectMapper extends
context.progress();
} catch (SQLException e) {
LOG.error(" Error {} while read/write of a record ", e.getMessage());
- context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+ context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount);
throw new RuntimeException(e);
}
context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
@@ -157,6 +159,7 @@ public class PhoenixIndexImportDirectMapper extends
mutationPair.getSecond().size());
}
connection.rollback();
+ currentBatchCount = 0;
}
@Override
@@ -173,7 +176,7 @@ public class PhoenixIndexImportDirectMapper extends
super.cleanup(context);
} catch (SQLException e) {
LOG.error(" Error {} while read/write of a record ", e.getMessage());
- context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+ context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount);
throw new RuntimeException(e);
} finally {
if (connection != null) {