You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/11/21 06:49:14 UTC

[tez] branch master updated: TEZ-4248: MRReaderMapred should propagate properties based on config (Marton Bod via László Bodor)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 989d286  TEZ-4248: MRReaderMapred should propagate properties based on config (Marton Bod via László Bodor)
989d286 is described below

commit 989d286d09cac7c4e4c5a0e06dd75ea5a6f15478
Author: Marton Bod <mb...@cloudera.com>
AuthorDate: Sat Nov 21 07:45:38 2020 +0100

    TEZ-4248: MRReaderMapred should propagate properties based on config (Marton Bod via László Bodor)
    
    Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
 .../org/apache/tez/dag/api/TezConfiguration.java   |  7 ++++
 .../apache/tez/mapreduce/lib/MRReaderMapred.java   | 25 ++++++++++--
 .../tez/mapreduce/lib/TestKVReadersWithMR.java     | 45 ++++++++++++++++++++++
 3 files changed, 73 insertions(+), 4 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 2af08a9..85f8518 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -2114,4 +2114,11 @@ public class TezConfiguration extends Configuration {
   @ConfigurationProperty
   public static final String TEZ_JOB_FS_SERVERS_TOKEN_RENEWAL_EXCLUDE = "tez.job.fs-servers.token-renewal.exclude";
 
+  /**
+   *  Comma-separated list of properties that MRReaderMapred should return (if present) when calling for config updates.
+   */
+  @ConfigurationScope(Scope.VERTEX)
+  @ConfigurationProperty
+  public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = "tez.mrreader.config.update.properties";
+
 }
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
index 21a4f96..e04ae7f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
@@ -145,6 +146,13 @@ public class MRReaderMapred extends MRReader {
    * @return the additional fields set by {@link MRInput}
    */
   public Configuration getConfigUpdates() {
+    String propertyList = jobConf.get(TezConfiguration.TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES);
+    if (propertyList != null) {
+      String[] properties = propertyList.split(",");
+      for (String prop : properties) {
+        addToIncrementalConfFromJobConf(prop);
+      }
+    }
     if (incrementalConf != null) {
       return new Configuration(incrementalConf);
     }
@@ -161,15 +169,24 @@ public class MRReaderMapred extends MRReader {
     setupComplete = true;
   }
 
-  private void setIncrementalConfigParams(InputSplit inputSplit) {
-    if (inputSplit instanceof FileSplit) {
-      FileSplit fileSplit = (FileSplit) inputSplit;
+  private void setIncrementalConfigParams(InputSplit split) {
+    if (split instanceof FileSplit) {
+      FileSplit fileSplit = (FileSplit) split;
       this.incrementalConf = new Configuration(false);
 
       this.incrementalConf.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString());
       this.incrementalConf.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
       this.incrementalConf.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
     }
-    LOG.info("Processing split: " + inputSplit);
+    LOG.info("Processing split: " + split);
+  }
+
+  private void addToIncrementalConfFromJobConf(String property) {
+    if (jobConf.get(property) != null) {
+      if (incrementalConf == null) {
+        incrementalConf = new Configuration(false);
+      }
+      incrementalConf.set(property, jobConf.get(property));
+    }
   }
 }
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
index dad18de..21a9246 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.mapreduce.lib;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -25,12 +26,15 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.InputContext;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -107,6 +111,47 @@ public class TestKVReadersWithMR {
     }
   }
 
+  @Test
+  public void testIncrementalConfigWithMultipleProperties() throws IOException {
+    InputContext mockContext = mock(InputContext.class);
+    MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter, mockContext);
+    conf.set(TezConfiguration.TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES, "column.names,does_not_exist,column.ids");
+    conf.set("column.names", "first_name,last_name,id");
+    conf.set("column.ids", "1,2,3");
+    conf.set("random", "value");
+
+    Configuration incrementalConf = reader.getConfigUpdates();
+
+    assertEquals(2, incrementalConf.size());
+    assertEquals("first_name,last_name,id", incrementalConf.get("column.names"));
+    assertEquals("1,2,3", incrementalConf.get("column.ids"));
+  }
+
+  @Test
+  public void testIncrementalConfigWithSingleProperty() throws IOException {
+    InputContext mockContext = mock(InputContext.class);
+    MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter, mockContext);
+    conf.set(TezConfiguration.TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES, "column.names");
+    conf.set("column.names", "first_name,last_name,id");
+    conf.set("random", "value");
+
+    Configuration incrementalConf = reader.getConfigUpdates();
+
+    assertEquals(1, incrementalConf.size());
+    assertEquals("first_name,last_name,id", incrementalConf.get("column.names"));
+  }
+
+  @Test
+  public void testIncrementalConfigWithZeroProperty() throws IOException {
+    InputContext mockContext = mock(InputContext.class);
+    MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter, mockContext);
+    conf.set("random", "value");
+
+    Configuration incrementalConf = reader.getConfigUpdates();
+
+    assertNull(incrementalConf);
+  }
+
   static class DummyRecordReader implements RecordReader {
     int records;