You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/11/21 06:49:14 UTC
[tez] branch master updated: TEZ-4248: MRReaderMapred should propagate properties based on config (Marton Bod via László Bodor)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 989d286 TEZ-4248: MRReaderMapred should propagate properties based on config (Marton Bod via László Bodor)
989d286 is described below
commit 989d286d09cac7c4e4c5a0e06dd75ea5a6f15478
Author: Marton Bod <mb...@cloudera.com>
AuthorDate: Sat Nov 21 07:45:38 2020 +0100
TEZ-4248: MRReaderMapred should propagate properties based on config (Marton Bod via László Bodor)
Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
.../org/apache/tez/dag/api/TezConfiguration.java | 7 ++++
.../apache/tez/mapreduce/lib/MRReaderMapred.java | 25 ++++++++++--
.../tez/mapreduce/lib/TestKVReadersWithMR.java | 45 ++++++++++++++++++++++
3 files changed, 73 insertions(+), 4 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 2af08a9..85f8518 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -2114,4 +2114,11 @@ public class TezConfiguration extends Configuration {
@ConfigurationProperty
public static final String TEZ_JOB_FS_SERVERS_TOKEN_RENEWAL_EXCLUDE = "tez.job.fs-servers.token-renewal.exclude";
+ /**
+ * Comma-separated list of properties that MRReaderMapred should return (if present) when calling for config updates.
+ */
+ @ConfigurationScope(Scope.VERTEX)
+ @ConfigurationProperty
+ public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = "tez.mrreader.config.update.properties";
+
}
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
index 21a4f96..e04ae7f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
@@ -145,6 +146,13 @@ public class MRReaderMapred extends MRReader {
* @return the additional fields set by {@link MRInput}
*/
public Configuration getConfigUpdates() {
+ String propertyList = jobConf.get(TezConfiguration.TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES);
+ if (propertyList != null) {
+ String[] properties = propertyList.split(",");
+ for (String prop : properties) {
+ addToIncrementalConfFromJobConf(prop);
+ }
+ }
if (incrementalConf != null) {
return new Configuration(incrementalConf);
}
@@ -161,15 +169,24 @@ public class MRReaderMapred extends MRReader {
setupComplete = true;
}
- private void setIncrementalConfigParams(InputSplit inputSplit) {
- if (inputSplit instanceof FileSplit) {
- FileSplit fileSplit = (FileSplit) inputSplit;
+ private void setIncrementalConfigParams(InputSplit split) {
+ if (split instanceof FileSplit) {
+ FileSplit fileSplit = (FileSplit) split;
this.incrementalConf = new Configuration(false);
this.incrementalConf.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString());
this.incrementalConf.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
this.incrementalConf.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
}
- LOG.info("Processing split: " + inputSplit);
+ LOG.info("Processing split: " + split);
+ }
+
+ private void addToIncrementalConfFromJobConf(String property) {
+ if (jobConf.get(property) != null) {
+ if (incrementalConf == null) {
+ incrementalConf = new Configuration(false);
+ }
+ incrementalConf.set(property, jobConf.get(property));
+ }
}
}
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
index dad18de..21a9246 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/lib/TestKVReadersWithMR.java
@@ -18,6 +18,7 @@
package org.apache.tez.mapreduce.lib;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -25,12 +26,15 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.InputContext;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@@ -107,6 +111,47 @@ public class TestKVReadersWithMR {
}
}
+ @Test
+ public void testIncrementalConfigWithMultipleProperties() throws IOException {
+ InputContext mockContext = mock(InputContext.class);
+ MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter, mockContext);
+ conf.set(TezConfiguration.TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES, "column.names,does_not_exist,column.ids");
+ conf.set("column.names", "first_name,last_name,id");
+ conf.set("column.ids", "1,2,3");
+ conf.set("random", "value");
+
+ Configuration incrementalConf = reader.getConfigUpdates();
+
+ assertEquals(2, incrementalConf.size());
+ assertEquals("first_name,last_name,id", incrementalConf.get("column.names"));
+ assertEquals("1,2,3", incrementalConf.get("column.ids"));
+ }
+
+ @Test
+ public void testIncrementalConfigWithSingleProperty() throws IOException {
+ InputContext mockContext = mock(InputContext.class);
+ MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter, mockContext);
+ conf.set(TezConfiguration.TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES, "column.names");
+ conf.set("column.names", "first_name,last_name,id");
+ conf.set("random", "value");
+
+ Configuration incrementalConf = reader.getConfigUpdates();
+
+ assertEquals(1, incrementalConf.size());
+ assertEquals("first_name,last_name,id", incrementalConf.get("column.names"));
+ }
+
+ @Test
+ public void testIncrementalConfigWithZeroProperty() throws IOException {
+ InputContext mockContext = mock(InputContext.class);
+ MRReaderMapred reader = new MRReaderMapred(conf, counters, inputRecordCounter, mockContext);
+ conf.set("random", "value");
+
+ Configuration incrementalConf = reader.getConfigUpdates();
+
+ assertNull(incrementalConf);
+ }
+
static class DummyRecordReader implements RecordReader {
int records;