You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/11/06 17:54:13 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1305] Fix ORC
RecordReader processing splits with offset
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 004d42d [GOBBLIN-1305] Fix ORC RecordReader processing splits with offset
004d42d is described below
commit 004d42d61a6f84fb19f4e81ab7d87681e290bc93
Author: Lei Sun <au...@gmail.com>
AuthorDate: Fri Nov 6 09:53:56 2020 -0800
[GOBBLIN-1305] Fix ORC RecordReader processing splits with offset
Fix ORC RecordReader processing splits with offset
Address reviewer's comments
Closes #3143 from autumnust/orcMapRedPartialReader
---
.../CompactionCompleteFileOperationAction.java | 10 +++--
.../compaction/event/CompactionSlaEventHelper.java | 1 +
.../CompactionCombineFileInputFormat.java | 7 ++-
.../gobblin/compaction/mapreduce/orc/OrcUtils.java | 13 +++++-
.../orc/OrcValueCombineFileRecordReader.java | 12 ++++-
.../compaction/verify/InputRecordCountHelper.java | 12 ++++-
.../mapreduce/OrcCompactionTaskTest.java | 51 +++++++++++++++++++++-
7 files changed, 94 insertions(+), 12 deletions(-)
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index 02ac90e..b7522ac 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -48,6 +48,8 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
+import static org.apache.gobblin.compaction.event.CompactionSlaEventHelper.DUPLICATE_COUNT_TOTAL;
+
/**
* A type of post action {@link CompactionCompleteAction} which focus on the file operations
@@ -162,19 +164,19 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete
Long.toString(helper.readRecordCount(new Path(result.getDstAbsoluteDir()))));
compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL + Long.toString(executionCount),
Long.toString(executionCount));
- compactionState.setProp("DuplicateRecordCount" + Long.toString(executionCount),
- compactionState.getProp("DuplicateRecordCount", "null"));
+ compactionState.setProp(DUPLICATE_COUNT_TOTAL + Long.toString(executionCount),
+ compactionState.getProp(DUPLICATE_COUNT_TOTAL, "null"));
}
compactionState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, Long.toString(newTotalRecords));
compactionState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, Long.toString(executionCount + 1));
compactionState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
this.configurator.getConfiguredJob().getJobID().toString());
- compactionState.setProp("DuplicateRecordCount",
+ compactionState.setProp(DUPLICATE_COUNT_TOTAL,
job.getCounters().findCounter(RecordKeyDedupReducerBase.EVENT_COUNTER.DEDUPED).getValue());
compactionState.setProp(CompactionSlaEventHelper.LAST_RUN_START_TIME,
this.state.getProp(CompactionSource.COMPACTION_INIT_TIME));
helper.saveState(new Path(result.getDstAbsoluteDir()), compactionState);
- log.info("duplicated records count for " + dstPath + " : " + compactionState.getProp("DuplicateRecordCount"));
+ log.info("duplicated records count for " + dstPath + " : " + compactionState.getProp(DUPLICATE_COUNT_TOTAL));
log.info("Updating record count from {} to {} in {} [{}]", oldTotalRecords, newTotalRecords, dstPath,
executionCount + 1);
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
index f6fd977..49a94d0 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/event/CompactionSlaEventHelper.java
@@ -53,6 +53,7 @@ public class CompactionSlaEventHelper {
public static final String EXEC_COUNT_TOTAL = "executionCountTotal";
public static final String MR_JOB_ID = "mrJobId";
public static final String RECORD_COUNT_TOTAL = "recordCountTotal";
+ public static final String DUPLICATE_COUNT_TOTAL = "DuplicateRecordCount";
public static final String HIVE_REGISTRATION_PATHS = "hiveRegistrationPaths";
public static final String RENAME_DIR_PATHS = "renameDirPaths";
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionCombineFileInputFormat.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionCombineFileInputFormat.java
index 6b9bfc4..7ce6aba 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionCombineFileInputFormat.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionCombineFileInputFormat.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.compaction.mapreduce;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
@@ -37,9 +38,11 @@ public abstract class CompactionCombineFileInputFormat<KI, KO> extends CombineFi
/**
* Properties related to the input format of the compaction job of a dataset.
*/
- private static final String COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE = COMPACTION_JOB_PREFIX + "mapred.max.split.size";
+ @VisibleForTesting
+ static final String COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE = COMPACTION_JOB_PREFIX + "mapred.max.split.size";
private static final long DEFAULT_COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE = 268435456;
- private static final String COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE = COMPACTION_JOB_PREFIX + "mapred.min.split.size";
+ @VisibleForTesting
+ static final String COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE = COMPACTION_JOB_PREFIX + "mapred.min.split.size";
private static final long DEFAULT_COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE = 268435456;
private static final int SPLIT_MAX_NUM_LOCATIONS = 10;
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
index f3da03b..fad4f1f 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
@@ -75,10 +75,19 @@ public class OrcUtils {
public static TypeDescription getTypeDescriptionFromFile(Configuration conf, Path orcFilePath)
throws IOException {
- return getRecordReaderFromFile(conf, orcFilePath).getSchema();
+ return getFileReader(conf, orcFilePath).getSchema();
}
- public static Reader getRecordReaderFromFile(Configuration conf, Path orcFilePath)
+ /**
+ * @deprecated Since the method name isn't accurate. Please calling {@link this#getFileReader(Configuration, Path)}
+ * directly
+ */
+ @Deprecated
+ public static Reader getRecordReaderFromFile(Configuration conf, Path orcFilePath) throws IOException {
+ return getFileReader(conf, orcFilePath);
+ }
+
+ public static Reader getFileReader(Configuration conf, Path orcFilePath)
throws IOException {
return OrcFile.createReader(orcFilePath, new OrcFile.ReaderOptions(conf));
}
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileRecordReader.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileRecordReader.java
index 3976e53..373b7e3 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileRecordReader.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueCombineFileRecordReader.java
@@ -18,11 +18,13 @@
package org.apache.gobblin.compaction.mapreduce.orc;
import java.io.IOException;
+
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
@@ -38,7 +40,8 @@ public class OrcValueCombineFileRecordReader extends OrcMapreduceRecordReader {
}
public OrcValueCombineFileRecordReader(RecordReader reader, TypeDescription schema, CombineFileSplit split,
- Integer splitIdx) throws IOException {
+ Integer splitIdx)
+ throws IOException {
super(reader, schema);
this.split = split;
this.splitIdx = splitIdx;
@@ -59,6 +62,11 @@ public class OrcValueCombineFileRecordReader extends OrcMapreduceRecordReader {
private static RecordReader getRecordReaderFromFile(CombineFileSplit split, TaskAttemptContext context, Integer idx)
throws IOException {
Path path = split.getPath(idx);
- return OrcUtils.getRecordReaderFromFile(context.getConfiguration(), path).rows();
+
+ // One should avoid using rows() without passing Reader.Options object as the configuration for RecordReader.
+ // Note that it is different from OrcFile Reader that getFileReader returns.
+ Reader.Options options = new Reader.Options(context.getConfiguration());
+ return OrcUtils.getFileReader(context.getConfiguration(), path)
+ .rows(options.range(split.getOffset(idx), split.getLength(idx)));
}
}
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
index fd131c8..0027ce6 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import static org.apache.gobblin.compaction.event.CompactionSlaEventHelper.DUPLICATE_COUNT_TOTAL;
+import static org.apache.gobblin.compaction.event.CompactionSlaEventHelper.EXEC_COUNT_TOTAL;
import static org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
@@ -176,8 +178,16 @@ public class InputRecordCountHelper {
* @return record count
*/
public long readExecutionCount(Path dir) throws IOException {
+ return readCountHelper(dir, EXEC_COUNT_TOTAL);
+ }
+
+ public long readDuplicationCount(Path dir) throws IOException {
+ return readCountHelper(dir, DUPLICATE_COUNT_TOTAL);
+ }
+
+ private long readCountHelper(Path dir, String countKeyName) throws IOException {
State state = loadState(fs, dir);
- return Long.parseLong(state.getProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, "0"));
+ return Long.parseLong(state.getProp(countKeyName, "0"));
}
/**
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index b9c4152..4b992a2 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -52,9 +52,13 @@ import org.apache.gobblin.compaction.mapreduce.orc.OrcTestUtils;
import org.apache.gobblin.compaction.mapreduce.orc.OrcUtils;
import org.apache.gobblin.compaction.mapreduce.test.TestCompactionOrcJobConfigurator;
import org.apache.gobblin.compaction.mapreduce.test.TestCompactionTaskUtils;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.State;
import org.apache.gobblin.runtime.api.JobExecutionResult;
import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+import static org.apache.gobblin.compaction.mapreduce.CompactionCombineFileInputFormat.COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE;
+import static org.apache.gobblin.compaction.mapreduce.CompactionCombineFileInputFormat.COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE;
import static org.apache.gobblin.compaction.mapreduce.CompactionOrcJobConfigurator.ORC_MAPPER_SHUFFLE_KEY_SCHEMA;
import static org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.COMPACTION_OUTPUT_EXTENSION;
import static org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET;
@@ -92,6 +96,52 @@ public class OrcCompactionTaskTest {
writeOrcRecordsInFile(new Path(file_1.getAbsolutePath()), schema, ImmutableList.of(orcStruct_1, orcStruct_3));
}
+ /**
+ * This test case covers the scenarios when the split size is smaller than an actual file size:
+ * RecordReader should stop on the boundary when read records that will make the split beyond the max_split_size,
+ * and the subsequent reader should just load the remaining records.
+ */
+ @Test
+ public void testWithPartialFileInSplit() throws Exception {
+ File basePath = Files.createTempDir();
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ basePath.deleteOnExit();
+
+ String minutelyPath = "Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20";
+ String hourlyPath = "Identity/MemberAccount/hourly/2017/04/03/10/";
+ File jobDir = new File(basePath, minutelyPath);
+ Assert.assertTrue(jobDir.mkdirs());
+
+ // Writing some basic ORC files
+ // Testing data is schema'ed with "struct<i:int,j:int>"
+ createTestingData(jobDir);
+
+ // sample a file size
+ FileStatus[] statuses = fs.listStatus(new Path(jobDir.getAbsolutePath()));
+ Assert.assertTrue(statuses.length > 0 );
+ long splitSize = statuses[0].getLen() / 2 ;
+ Assert.assertTrue(splitSize > 0);
+
+ EmbeddedGobblin embeddedGobblin = TestCompactionTaskUtils.createEmbeddedGobblinCompactionJob("basic", basePath.getAbsolutePath())
+ .setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
+ TestCompactionOrcJobConfigurator.Factory.class.getName())
+ // Each file generated by the data-creation function is around 250 bytes in terms of length.
+ // Setting the max split size to be half the size force a single file to be split.
+ .setConfiguration(COMPACTION_JOB_MAPRED_MAX_SPLIT_SIZE, splitSize + "")
+ .setConfiguration(COMPACTION_JOB_MAPRED_MIN_SPLIT_SIZE, splitSize + "")
+ .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionName);
+ JobExecutionResult execution = embeddedGobblin.run();
+ Assert.assertTrue(execution.isSuccessful());
+
+ // Result verification: Verify the duplicate count is expected.
+ File outputDir = new File(basePath, hourlyPath);
+ State state = new State();
+ state.setProp(COMPACTION_OUTPUT_EXTENSION, "orc");
+ InputRecordCountHelper stateHelper = new InputRecordCountHelper(state);
+ Assert.assertEquals(stateHelper.readRecordCount(new Path(outputDir.getAbsolutePath())), 4);
+ Assert.assertEquals(stateHelper.readDuplicationCount(new Path(outputDir.getAbsolutePath())), 1);
+ }
+
@Test
public void basicTestWithShuffleKeySpecified() throws Exception {
File basePath = Files.createTempDir();
@@ -130,7 +180,6 @@ public class OrcCompactionTaskTest {
Assert.assertEquals(result.get(1).getFieldValue("j"), new IntWritable(3));
Assert.assertEquals(result.get(2).getFieldValue("i"), new IntWritable(4));
Assert.assertEquals(result.get(2).getFieldValue("j"), new IntWritable(5));
-
}
@Test