You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/07/16 20:38:55 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1214] Move the fallback of in-eligible shuffleKey to driver

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new dcc3b82  [GOBBLIN-1214] Move the fallback of in-eligible shuffleKey to driver
dcc3b82 is described below

commit dcc3b8203bf8054332238fa5fa0f5511b82597c6
Author: Lei Sun <au...@gmail.com>
AuthorDate: Thu Jul 16 13:38:39 2020 -0700

    [GOBBLIN-1214] Move the fallback of in-eligible shuffleKey to driver
    
    Move fallback of shuffleKey into driver level
    
    Add a unit test for the case when shuffle key not
    existed
    
    Closes #3062 from autumnust/reproduce-gcn-33158
---
 .../mapreduce/CompactionOrcJobConfigurator.java    | 15 ++++++--
 .../compaction/mapreduce/orc/OrcValueMapper.java   | 23 +++++-------
 .../mapreduce/OrcCompactionTaskTest.java           | 41 ++++++++++++++++++++++
 3 files changed, 62 insertions(+), 17 deletions(-)

diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
index 40a5742..c417c60 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
@@ -35,12 +35,13 @@ import org.apache.orc.mapred.OrcKey;
 import org.apache.orc.mapred.OrcValue;
 
 import static org.apache.gobblin.compaction.mapreduce.CompactorOutputCommitter.*;
+import static org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert;
 
 
 public class CompactionOrcJobConfigurator extends CompactionJobConfigurator {
 
   /**
-   * The key schema for the shuffle output. 
+   * The key schema for the shuffle output.
    */
   public static final String ORC_MAPPER_SHUFFLE_KEY_SCHEMA = "orcMapperShuffleSchema";
   private String orcMapperShuffleSchemaString;
@@ -66,8 +67,16 @@ public class CompactionOrcJobConfigurator extends CompactionJobConfigurator {
     TypeDescription schema = OrcUtils.getNewestSchemaFromSource(job, this.fs);
 
     job.getConfiguration().set(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute(), schema.toString());
-    job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(),
-        orcMapperShuffleSchemaString.isEmpty() ? schema.toString() : orcMapperShuffleSchemaString);
+
+    // Determine the shuffle-schema: Only take the user-specified shuffle-schema if it is upconvertable
+    // Check the eligibleForUpConvert method for the definition of eligibility.
+    if (!orcMapperShuffleSchemaString.isEmpty()
+        && eligibleForUpConvert(schema, TypeDescription.fromString(orcMapperShuffleSchemaString))) {
+      job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(), orcMapperShuffleSchemaString);
+    } else {
+      job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute(), schema.toString());
+    }
+
     job.getConfiguration().set(OrcConf.MAPRED_SHUFFLE_VALUE_SCHEMA.getAttribute(), schema.toString());
     job.getConfiguration().set(OrcConf.MAPRED_OUTPUT_SCHEMA.getAttribute(), schema.toString());
   }
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
index 31cbbdf..10c5ac4 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
@@ -39,7 +39,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
 
-import static org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert;
+import static org.apache.orc.OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA;
 
 
 /**
@@ -53,11 +53,8 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
   // This key will only be initialized lazily when dedup is enabled.
   private OrcKey outKey;
   private OrcValue outValue;
-  private TypeDescription mrOutputSchema;
+  private TypeDescription mrInputSchema;
   private TypeDescription shuffleKeySchema;
-  // Lazily initiated flag indicating if shuffleKeySchema is eligible to be used.
-  // Check org.apache.gobblin.compaction.mapreduce.orc.OrcUtils.eligibleForUpConvert for details.
-  private Boolean isShuffleKeyEligible;
   private JobConf jobConf;
 
   // This is added mostly for debuggability.
@@ -72,10 +69,12 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
     this.outKey.configure(jobConf);
     this.outValue = new OrcValue();
     this.outValue.configure(jobConf);
-    this.mrOutputSchema =
+
+    // This is the consistent input-schema among all mappers.
+    this.mrInputSchema =
         TypeDescription.fromString(context.getConfiguration().get(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute()));
     this.shuffleKeySchema =
-        TypeDescription.fromString(context.getConfiguration().get(OrcConf.MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()));
+        TypeDescription.fromString(context.getConfiguration().get(MAPRED_SHUFFLE_KEY_SCHEMA.getAttribute()));
   }
 
   @Override
@@ -83,10 +82,10 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
       throws IOException, InterruptedException {
 
     // Up-convert OrcStruct only if schema differs
-    if (!orcStruct.getSchema().equals(this.mrOutputSchema)) {
+    if (!orcStruct.getSchema().equals(this.mrInputSchema)) {
       // Note that outValue.value is being re-used.
       log.info("There's a schema difference between output schema and input schema");
-      OrcUtils.upConvertOrcStruct(orcStruct, (OrcStruct) outValue.value, mrOutputSchema);
+      OrcUtils.upConvertOrcStruct(orcStruct, (OrcStruct) outValue.value, mrInputSchema);
     } else {
       this.outValue.value = orcStruct;
     }
@@ -125,11 +124,7 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
    * Note: This method should have no side-effect on input record.
    */
   private void fillDedupKey(OrcStruct originalRecord) {
-    if (this.isShuffleKeyEligible == null) {
-      this.isShuffleKeyEligible = eligibleForUpConvert(originalRecord.getSchema(), this.shuffleKeySchema);
-    }
-
-    if (!originalRecord.getSchema().equals(this.shuffleKeySchema) && isShuffleKeyEligible) {
+    if (!originalRecord.getSchema().equals(this.shuffleKeySchema)) {
       OrcUtils.upConvertOrcStruct(originalRecord, (OrcStruct) this.outKey.key, this.shuffleKeySchema);
     } else {
       this.outKey.key = originalRecord;
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
index b58fc36..33191b8 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/OrcCompactionTaskTest.java
@@ -90,6 +90,47 @@ public class OrcCompactionTaskTest {
   }
 
   @Test
+  public void basicTestWithShuffleKeySpecified() throws Exception {
+    File basePath = Files.createTempDir();
+    basePath.deleteOnExit();
+
+    String minutelyPath = "Identity/MemberAccount/minutely/2017/04/03/10/20_30/run_2017-04-03-10-20";
+    String hourlyPath = "Identity/MemberAccount/hourly/2017/04/03/10/";
+    File jobDir = new File(basePath, minutelyPath);
+    Assert.assertTrue(jobDir.mkdirs());
+
+    // Writing some basic ORC files
+    // Testing data is schema'ed with "struct<i:int,j:int>"
+    createTestingData(jobDir);
+
+    EmbeddedGobblin embeddedGobblin = createEmbeddedGobblin("basic", basePath.getAbsolutePath().toString())
+        .setConfiguration(CompactionJobConfigurator.COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY,
+            TestCompactionOrcJobConfigurator.Factory.class.getName())
+        .setConfiguration(COMPACTION_OUTPUT_EXTENSION, extensionName)
+        // A shuffle key that shouldn't be taken.
+        .setConfiguration(ORC_MAPPER_SHUFFLE_KEY_SCHEMA, "struct<k:int>");
+    JobExecutionResult execution = embeddedGobblin.run();
+    Assert.assertTrue(execution.isSuccessful());
+
+    // Result verification
+    File outputDir = new File(basePath, hourlyPath);
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    List<FileStatus> statuses = new ArrayList<>();
+    reloadFolder(statuses, outputDir, fs);
+
+    Assert.assertTrue(statuses.size() == 1);
+    List<OrcStruct> result = readOrcFile(statuses.get(0).getPath());
+    Assert.assertEquals(result.size(), 3);
+    Assert.assertEquals(result.get(0).getFieldValue("i"), new IntWritable(1));
+    Assert.assertEquals(result.get(0).getFieldValue("j"), new IntWritable(2));
+    Assert.assertEquals(result.get(1).getFieldValue("i"), new IntWritable(2));
+    Assert.assertEquals(result.get(1).getFieldValue("j"), new IntWritable(3));
+    Assert.assertEquals(result.get(2).getFieldValue("i"), new IntWritable(4));
+    Assert.assertEquals(result.get(2).getFieldValue("j"), new IntWritable(5));
+
+  }
+
+  @Test
   public void basicTestWithRecompactionAndBasicSchemaEvolution() throws Exception {
     File basePath = Files.createTempDir();
     basePath.deleteOnExit();