You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/11/06 17:54:13 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1305] Fix ORC RecordReader processing splits with offset

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 004d42d  [GOBBLIN-1305] Fix ORC RecordReader processing splits with offset
004d42d is described below

commit 004d42d61a6f84fb19f4e81ab7d87681e290bc93
Author: Lei Sun <au...@gmail.com>
AuthorDate: Fri Nov 6 09:53:56 2020 -0800

    [GOBBLIN-1305] Fix ORC RecordReader processing splits with offset
    
    Fix ORC RecordReader processing splits with offset
    
    Address reviewer's comments
    
    Closes #3143 from autumnust/orcMapRedPartialReader
---
 .../CompactionCompleteFileOperationAction.java     | 10 +++--
 .../compaction/event/CompactionSlaEventHelper.java |  1 +
 .../CompactionCombineFileInputFormat.java          |  7 ++-
 .../gobblin/compaction/mapreduce/orc/OrcUtils.java | 13 +++++-
 .../orc/OrcValueCombineFileRecordReader.java       | 12 ++++-
 .../compaction/verify/InputRecordCountHelper.java  | 12 ++++-
 .../mapreduce/OrcCompactionTaskTest.java           | 51 +++++++++++++++++++++-
 7 files changed, 94 insertions(+), 12 deletions(-)

diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index 02ac90e..b7522ac 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -48,6 +48,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Job;
 
+import static org.apache.gobblin.compaction.event.CompactionSlaEventHelper.DUPLICATE_COUNT_TOTAL;
+
 
 /**
  * A type of post action {@link CompactionCompleteAction} which focus on the file operations
@@ -162,19 +164,19 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
             Long.toString(helper.readRecordCount(new Path(result.getDstAbsoluteDir()))));
         compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL + Long.toString(executionCount),
             Long.toString(executionCount));
-        compactionState.setProp("DuplicateRecordCount" + Long.toString(executionCount),
-            compactionState.getProp("DuplicateRecordCount", "null"));
+        compactionState.setProp(DUPLICATE_COUNT_TOTAL + Long.toString(executionCount),
+            compactionState.getProp(DUPLICATE_COUNT_TOTAL, "null"));
       }
       compactionState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
       compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executionCount + 1));
       compactionState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
           this.configurator.getConfiguredJob().getJobID().toString());
-      compactionState.setProp("DuplicateRecordCount",
+      compactionState.setProp(DUPLICATE_COUNT_TOTAL,
           job.getCounters().findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
       compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
           this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
       helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
-      log.info("duplicated records count for " + dstPath + " : " + compactionState.getProp("DuplicateRecordCount"));
+      log.info("duplicated records count for " + dstPath + " : " + compactionState.getProp(DUPLICATE_COUNT_TOTAL));
 
       log.info("Updating record count from {} to {} in {} [{}]", oldTotalRecords, newTotalRecords, dstPath,
           executionCount + 1);
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
index f6fd977..49a94d0 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
@@ -53,6 +53,7 @@ public class CompactionSlaEventHelper {
   public static final String EXEC_COUNT_TOTAL = "executionCountTotal";
   public static final String MR_JOB_ID = "mrJobId";
   public static final String RECORD_COUNT_TOTAL = "recordCountTotal";
+  public static final String DUPLICATE_COUNT_TOTAL = "DuplicateRecordCount";
   public static final String HIVE_REGISTRATION_PATHS = "hiveRegistrationPaths";
   public static final String RENAME_DIR_PATHS = "renameDirPaths";
 
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionCombineFileInputFormat.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionCombineFileInputFormat.java
index 6b9bfc4..7ce6aba 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionCombineFileInputFormat.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionCombineFileInputFormat.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.compaction.mapreduce;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import java.io.IOException;
@@ -37,9 +38,11 @@ public abstract class CompactionCombineFileInputFormat<KI, KO> extends CombineFi
   /**
    * Properties related to the input format of the compaction job of a dataset.
    */
-  private static final String COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE = COMPACTION_JOB_PREFIX + "mapred.max.split.size";
+  @VisibleForTesting
+  static final String COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE = COMPACTION_JOB_PREFIX + "mapred.max.split.size";
   private static final long DEFAULT_COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE = 268435456;
-  private static final String COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE = COMPACTION_JOB_PREFIX + "mapred.min.split.size";
+  @VisibleForTesting
+  static final String COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE = COMPACTION_JOB_PREFIX + "mapred.min.split.size";
   private static final long DEFAULT_COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE = 268435456;
 
   private static final int SPLIT_MAX_NUM_LOCATIONS = 10;
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
index f3da03b..fad4f1f 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
@@ -75,10 +75,19 @@ public class OrcUtils {
 
   public static TypeDescription getTypeDescriptionFromFile(Configuration conf, Path orcFilePath)
       throws IOException {
-    return getRecordReaderFromFile(conf, orcFilePath).getSchema();
+    return getFileReader(conf, orcFilePath).getSchema();
   }
 
-  public static Reader getRecordReaderFromFile(Configuration conf, Path orcFilePath)
+  /**
+   * @deprecated Since the method name isn't accurate. Please calling {@link this#getFileReader(Configuration, Path)}
+   * directly
+   */
+  @Deprecated
+  public static Reader getRecordReaderFromFile(Configuration conf, Path orcFilePath) throws IOException {
+    return getFileReader(conf, orcFilePath);
+  }
+
+  public static Reader getFileReader(Configuration conf, Path orcFilePath)
       throws IOException {
     return OrcFile.createReader(orcFilePath, new OrcFile.ReaderOptions(conf));
   }
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileRecordReader.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileRecordReader.java
index 3976e53..373b7e3 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileRecordReader.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileRecordReader.java
@@ -18,11 +18,13 @@
 package org.apache.gobblin.compaction.mapreduce.orc;
 
 import java.io.IOException;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
@@ -38,7 +40,8 @@ public class OrcValueCombineFileRecordReader extends OrcMapreduceRecordReader {
   }
 
   public OrcValueCombineFileRecordReader(RecordReader reader, TypeDescription schema, CombineFileSplit split,
-      Integer splitIdx) throws IOException {
+      Integer splitIdx)
+      throws IOException {
     super(reader, schema);
     this.split = split;
     this.splitIdx = splitIdx;
@@ -59,6 +62,11 @@ public class OrcValueCombineFileRecordReader extends OrcMapreduceRecordReader {
   private static RecordReader getRecordReaderFromFile(CombineFileSplit split, TaskAttemptContext context, Integer idx)
       throws IOException {
     Path path = split.getPath(idx);
-    return OrcUtils.getRecordReaderFromFile(context.getConfiguration(), path).rows();
+
+    // One should avoid using rows() without passing Reader.Options object as the configuration for RecordReader.
+    // Note that it is different from OrcFile Reader that getFileReader returns.
+    Reader.Options options = new Reader.Options(context.getConfiguration());
+    return OrcUtils.getFileReader(context.getConfiguration(), path)
+        .rows(options.range(split.getOffset(idx), split.getLength(idx)));
   }
 }
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
index fd131c8..0027ce6 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import static org.apache.gobblin.compaction.event.CompactionSlaEventHelper.DUPLICATE_COUNT_TOTAL;
+import static org.apache.gobblin.compaction.event.CompactionSlaEventHelper.EXEC_COUNT_TOTAL;
 import static org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
 
 
@@ -176,8 +178,16 @@ public class InputRecordCountHelper {
    * @return record count
    */
   public long readExecutionCount(Path dir) throws IOException {
+    return readCountHelper(dir, EXEC_COUNT_TOTAL);
+  }
+
+  public long readDuplicationCount(Path dir) throws IOException {
+    return readCountHelper(dir, DUPLICATE_COUNT_TOTAL);
+  }
+
+  private long readCountHelper(Path dir, String countKeyName) throws IOException {
     State state = loadState(fs, dir);
-    return Long.parseLong(state.getProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, "0"));
+    return Long.parseLong(state.getProp(countKeyName, "0"));
   }
 
   /**
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index b9c4152..4b992a2 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -52,9 +52,13 @@ import org.apache.gobblin.compaction.mapreduce.orc.OrcTestUtils;
 import org.apache.gobblin.compaction.mapreduce.orc.OrcUtils;
 import org.apache.gobblin.compaction.mapreduce.test.TestCompactionOrcJobConfigurator;
 import org.apache.gobblin.compaction.mapreduce.test.TestCompactionTaskUtils;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.runtime.api.JobExecutionResult;
 import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
 
+import static org.apache.gobblin.compaction.mapreduce.CompactionCombineFileInputFormat.COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE;
+import static org.apache.gobblin.compaction.mapreduce.CompactionCombineFileInputFormat.COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE;
 import static org.apache.gobblin.compaction.mapreduce.CompactionOrcJobConfigurator.ORC_MAPPER_SHUFFLE_KEY_SCHEMA;
 import static org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION;
 import static org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET;
@@ -92,6 +96,52 @@ public class OrcCompactionTaskTest {
     writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema, ImmutableList.of(orcStruct_1, orcStruct_3));
   }
 
+  /**
+   * This test case covers the scenarios when the split size is smaller than an actual file size:
+   * RecordReader should stop on the boundary when read records that will make the split beyond the max_split_size,
+   * and the subsequent reader should just load the remaining records.
+   */
+  @Test
+  public void testWithPartialFileInSplit() throws Exception {
+    File basePath = Files.createTempDir();
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    basePath.deleteOnExit();
+
+    String minutelyPath = "Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20";
+    String hourlyPath = "Identity/MemberAccount/hourly/2017/04/03/10/";
+    File jobDir = new File(basePath, minutelyPath);
+    Assert.assertTrue(jobDir.mkdirs());
+
+    // Writing some basic ORC files
+    // Testing data is schema'ed with "struct<i:int,j:int>"
+    createTestingData(jobDir);
+
+    // sample a file size
+    FileStatus[] statuses = fs.listStatus(new Path(jobDir.getAbsolutePath()));
+    Assert.assertTrue(statuses.length > 0 );
+    long splitSize = statuses[0].getLen() / 2 ;
+    Assert.assertTrue(splitSize > 0);
+
+    EmbeddedGobblin embeddedGobblin = TestCompactionTaskUtils.createEmbeddedGobblinCompactionJob("basic", basePath.getAbsolutePath())
+        .setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
+            TestCompactionOrcJobConfigurator.Factory.class.getName())
+        // Each file generated by the data-creation function is around 250 bytes in terms of length.
+        // Setting the max split size to be half the size force a single file to be split.
+        .setConfiguration(COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE, splitSize + "")
+        .setConfiguration(COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE, splitSize + "")
+        .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionName);
+    JobExecutionResult execution = embeddedGobblin.run();
+    Assert.assertTrue(execution.isSuccessful());
+
+    // Result verification: Verify the duplicate count is expected.
+    File outputDir = new File(basePath, hourlyPath);
+    State state = new State();
+    state.setProp(COMPACTION_OUTPUT_EXTENSION, "orc");
+    InputRecordCountHelper stateHelper = new InputRecordCountHelper(state);
+    Assert.assertEquals(stateHelper.readRecordCount(new Path(outputDir.getAbsolutePath())), 4);
+    Assert.assertEquals(stateHelper.readDuplicationCount(new Path(outputDir.getAbsolutePath())), 1);
+  }
+
   @Test
   public void basicTestWithShuffleKeySpecified() throws Exception {
     File basePath = Files.createTempDir();
@@ -130,7 +180,6 @@ public class OrcCompactionTaskTest {
     Assert.assertEquals(result.get(1).getFieldValue("j"), new IntWritable(3));
     Assert.assertEquals(result.get(2).getFieldValue("i"), new IntWritable(4));
     Assert.assertEquals(result.get(2).getFieldValue("j"), new IntWritable(5));
-
   }
 
   @Test