You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2018/11/16 22:26:18 UTC
phoenix git commit: PHOENIX-4955 - PhoenixIndexImportDirectMapper
undercounts failed records
Repository: phoenix
Updated Branches:
refs/heads/master 7eb41b0b2 -> d128553c8
PHOENIX-4955 - PhoenixIndexImportDirectMapper undercounts failed records
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d128553c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d128553c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d128553c
Branch: refs/heads/master
Commit: d128553c8014a68fe926f52bb3bfb62dd029ce15
Parents: 7eb41b0
Author: Geoffrey Jacoby <gj...@apache.org>
Authored: Fri Nov 16 13:57:45 2018 -0800
Committer: Geoffrey Jacoby <gj...@apache.org>
Committed: Fri Nov 16 14:19:54 2018 -0800
----------------------------------------------------------------------
.../mapreduce/index/PhoenixIndexImportDirectMapper.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d128553c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index eb4bc0e..e2ac491 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -68,6 +68,8 @@ public class PhoenixIndexImportDirectMapper extends
private long batchSizeBytes;
private MutationState mutationState;
+ private int currentBatchCount = 0;
+
@Override
protected void setup(final Context context) throws IOException, InterruptedException {
@@ -113,6 +115,7 @@ public class PhoenixIndexImportDirectMapper extends
throws IOException, InterruptedException {
try {
+ currentBatchCount++;
final List<Object> values = record.getValues();
indxWritable.setValues(values);
indxWritable.write(this.pStatement);
@@ -125,9 +128,8 @@ public class PhoenixIndexImportDirectMapper extends
}
// Keep accumulating Mutations till batch size
mutationState.join(currentMutationState);
-
// Write Mutation Batch
- if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize == 0) {
+ if (currentBatchCount % batchSize == 0) {
writeBatch(mutationState, context);
mutationState = null;
}
@@ -136,7 +138,7 @@ public class PhoenixIndexImportDirectMapper extends
context.progress();
} catch (SQLException e) {
LOG.error(" Error {} while read/write of a record ", e.getMessage());
- context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+ context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount);
throw new RuntimeException(e);
}
context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
@@ -157,6 +159,7 @@ public class PhoenixIndexImportDirectMapper extends
mutationPair.getSecond().size());
}
connection.rollback();
+ currentBatchCount = 0;
}
@Override
@@ -173,7 +176,7 @@ public class PhoenixIndexImportDirectMapper extends
super.cleanup(context);
} catch (SQLException e) {
LOG.error(" Error {} while read/write of a record ", e.getMessage());
- context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+ context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount);
throw new RuntimeException(e);
} finally {
if (connection != null) {