You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/07/16 20:38:55 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1214] Move the
fallback of in-eligible shuffleKey to driver
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new dcc3b82 [GOBBLIN-1214] Move the fallback of in-eligible shuffleKey to driver
dcc3b82 is described below
commit dcc3b8203bf8054332238fa5fa0f5511b82597c6
Author: Lei Sun <au...@gmail.com>
AuthorDate: Thu Jul 16 13:38:39 2020 -0700
[GOBBLIN-1214] Move the fallback of in-eligible shuffleKey to driver
Move fallback of shuffleKey into driver level
Add a unit test for the case when shuffle key not
existed
Closes #3062 from autumnust/reproduce-gcn-33158
---
.../mapreduce/CompactionOrcJobConfigurator.java | 15 ++++++--
.../compaction/mapreduce/orc/OrcValueMapper.java | 23 +++++-------
.../mapreduce/OrcCompactionTaskTest.java | 41 ++++++++++++++++++++++
3 files changed, 62 insertions(+), 17 deletions(-)
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
index 40a5742..c417c60 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
@@ -35,12 +35,13 @@ import org.apache.orc.mapred.OrcKey;
import org.apache.orc.mapred.OrcValue;
import static org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+import static org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert;
public class CompactionOrcJobConfigurator extends CompactionJobConfigurator {
/**
- * The key schema for the shuffle output.
+ * The key schema for the shuffle output.
*/
public static final String ORC_MAPPER_SHUFFLE_KEY_SCHEMA = "orcMapperShuffleSchema";
private String orcMapperShuffleSchemaString;
@@ -66,8 +67,16 @@ public class CompactionOrcJobConfigurator extends CompactionJobConfigurator {
TypeDescription schema = OrcUtils.getNewestSchemaFromSource(job, this.fs);
job.getConfiguration().set(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute(), schema.toString());
- job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(),
- orcMapperShuffleSchemaString.isEmpty() ? schema.toString() : orcMapperShuffleSchemaString);
+
+ // Determine the shuffle-schema: Only take the user-specified shuffle-schema if it is upconvertable
+ // Check the eligibleForUpConvert method for the definition of eligibility.
+ if (!orcMapperShuffleSchemaString.isEmpty()
+ && eligibleForUpConvert(schema, TypeDescription.fromString(orcMapperShuffleSchemaString))) {
+ job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(), orcMapperShuffleSchemaString);
+ } else {
+ job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(), schema.toString());
+ }
+
job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.getAttribute(), schema.toString());
job.getConfiguration().set(OrcConf.MAPRED_OUTPUT_SCHEMA.getAttribute(), schema.toString());
}
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
index 31cbbdf..10c5ac4 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
@@ -39,7 +39,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
-import static org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert;
+import static org.apache.orc.OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA;
/**
@@ -53,11 +53,8 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
// This key will only be initialized lazily when dedup is enabled.
private OrcKey outKey;
private OrcValue outValue;
- private TypeDescription mrOutputSchema;
+ private TypeDescription mrInputSchema;
private TypeDescription shuffleKeySchema;
- // Lazily initiated flag indicating if shuffleKeySchema is eligible to be used.
- // Check org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert for details.
- private Boolean isShuffleKeyEligible;
private JobConf jobConf;
// This is added mostly for debuggability.
@@ -72,10 +69,12 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
this.outKey.configure(jobConf);
this.outValue = new OrcValue();
this.outValue.configure(jobConf);
- this.mrOutputSchema =
+
+ // This is the consistent input-schema among all mappers.
+ this.mrInputSchema =
TypeDescription.fromString(context.getConfiguration().get(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute()));
this.shuffleKeySchema =
- TypeDescription.fromString(context.getConfiguration().get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()));
+ TypeDescription.fromString(context.getConfiguration().get(MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()));
}
@Override
@@ -83,10 +82,10 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
throws IOException, InterruptedException {
// Up-convert OrcStruct only if schema differs
- if (!orcStruct.getSchema().equals(this.mrOutputSchema)) {
+ if (!orcStruct.getSchema().equals(this.mrInputSchema)) {
// Note that outValue.value is being re-used.
log.info("There's a schema difference between output schema and input schema");
- OrcUtils.upConvertOrcStruct(orcStruct, (OrcStruct) outValue.value, mrOutputSchema);
+ OrcUtils.upConvertOrcStruct(orcStruct, (OrcStruct) outValue.value, mrInputSchema);
} else {
this.outValue.value = orcStruct;
}
@@ -125,11 +124,7 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
* Note: This method should have no side-effect on input record.
*/
private void fillDedupKey(OrcStruct originalRecord) {
- if (this.isShuffleKeyEligible == null) {
- this.isShuffleKeyEligible = eligibleForUpConvert(originalRecord.getSchema(), this.shuffleKeySchema);
- }
-
- if (!originalRecord.getSchema().equals(this.shuffleKeySchema) && isShuffleKeyEligible) {
+ if (!originalRecord.getSchema().equals(this.shuffleKeySchema)) {
OrcUtils.upConvertOrcStruct(originalRecord, (OrcStruct) this.outKey.key, this.shuffleKeySchema);
} else {
this.outKey.key = originalRecord;
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index b58fc36..33191b8 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -90,6 +90,47 @@ public class OrcCompactionTaskTest {
}
@Test
+ public void basicTestWithShuffleKeySpecified() throws Exception {
+ File basePath = Files.createTempDir();
+ basePath.deleteOnExit();
+
+ String minutelyPath = "Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20";
+ String hourlyPath = "Identity/MemberAccount/hourly/2017/04/03/10/";
+ File jobDir = new File(basePath, minutelyPath);
+ Assert.assertTrue(jobDir.mkdirs());
+
+ // Writing some basic ORC files
+ // Testing data is schema'ed with "struct<i:int,j:int>"
+ createTestingData(jobDir);
+
+ EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic", basePath.getAbsolutePath().toString())
+ .setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
+ TestCompactionOrcJobConfigurator.Factory.class.getName())
+ .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionName)
+ // A shuffle key that shouldn't be taken.
+ .setConfiguration(ORC_MAPPER_SHUFFLE_KEY_SCHEMA, "struct<k:int>");
+ JobExecutionResult execution = embeddedGobblin.run();
+ Assert.assertTrue(execution.isSuccessful());
+
+ // Result verification
+ File outputDir = new File(basePath, hourlyPath);
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ List<FileStatus> statuses = new ArrayList<>();
+ reloadFolder(statuses, outputDir, fs);
+
+ Assert.assertTrue(statuses.size() == 1);
+ List<OrcStruct> result = readOrcFile(statuses.get(0).getPath());
+ Assert.assertEquals(result.size(), 3);
+ Assert.assertEquals(result.get(0).getFieldValue("i"), new IntWritable(1));
+ Assert.assertEquals(result.get(0).getFieldValue("j"), new IntWritable(2));
+ Assert.assertEquals(result.get(1).getFieldValue("i"), new IntWritable(2));
+ Assert.assertEquals(result.get(1).getFieldValue("j"), new IntWritable(3));
+ Assert.assertEquals(result.get(2).getFieldValue("i"), new IntWritable(4));
+ Assert.assertEquals(result.get(2).getFieldValue("j"), new IntWritable(5));
+
+ }
+
+ @Test
public void basicTestWithRecompactionAndBasicSchemaEvolution() throws Exception {
File basePath = Files.createTempDir();
basePath.deleteOnExit();