You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/11/27 15:18:31 UTC

[21/28] phoenix git commit: PHOENIX-4955 - PhoenixIndexImportDirectMapper undercounts failed records

PHOENIX-4955 - PhoenixIndexImportDirectMapper undercounts failed records


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/dd81989f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/dd81989f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/dd81989f

Branch: refs/heads/4.x-cdh5.15
Commit: dd81989fab80cb283678218ada0c0359930731c8
Parents: 590f88b
Author: Geoffrey Jacoby <gj...@apache.org>
Authored: Fri Nov 16 21:57:45 2018 +0000
Committer: Pedro Boado <pb...@apache.org>
Committed: Tue Nov 27 15:12:05 2018 +0000

----------------------------------------------------------------------
 .../mapreduce/index/PhoenixIndexImportDirectMapper.java  | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/dd81989f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index eb4bc0e..e2ac491 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -68,6 +68,8 @@ public class PhoenixIndexImportDirectMapper extends
     private long batchSizeBytes;
 
     private MutationState mutationState;
+    private int currentBatchCount = 0;
+
 
     @Override
     protected void setup(final Context context) throws IOException, InterruptedException {
@@ -113,6 +115,7 @@ public class PhoenixIndexImportDirectMapper extends
             throws IOException, InterruptedException {
 
         try {
+            currentBatchCount++;
             final List<Object> values = record.getValues();
             indxWritable.setValues(values);
             indxWritable.write(this.pStatement);
@@ -125,9 +128,8 @@ public class PhoenixIndexImportDirectMapper extends
             }
             // Keep accumulating Mutations till batch size
             mutationState.join(currentMutationState);
-
             // Write Mutation Batch
-            if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize == 0) {
+            if (currentBatchCount % batchSize == 0) {
                 writeBatch(mutationState, context);
                 mutationState = null;
             }
@@ -136,7 +138,7 @@ public class PhoenixIndexImportDirectMapper extends
             context.progress();
         } catch (SQLException e) {
             LOG.error(" Error {}  while read/write of a record ", e.getMessage());
-            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount);
             throw new RuntimeException(e);
         }
         context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
@@ -157,6 +159,7 @@ public class PhoenixIndexImportDirectMapper extends
                 mutationPair.getSecond().size());
         }
         connection.rollback();
+        currentBatchCount = 0;
     }
 
     @Override
@@ -173,7 +176,7 @@ public class PhoenixIndexImportDirectMapper extends
             super.cleanup(context);
         } catch (SQLException e) {
             LOG.error(" Error {}  while read/write of a record ", e.getMessage());
-            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(currentBatchCount);
             throw new RuntimeException(e);
         } finally {
             if (connection != null) {