You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:53 UTC
[44/50] [abbrv] tez git commit: TEZ-1736. Add support for
Inputs/Outputs in runtime-library to generate history text data. (hitesh)
TEZ-1736. Add support for Inputs/Outputs in runtime-library to generate history text data. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0daf2ba1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0daf2ba1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0daf2ba1
Branch: refs/heads/TEZ-8
Commit: 0daf2ba188416f69198e23621d05f3dd3acd2882
Parents: 0cceb1f
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Nov 12 14:28:10 2014 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Nov 12 14:28:10 2014 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/common/ATSConstants.java | 6 ++-
.../java/org/apache/tez/common/TezUtils.java | 36 +++++++++++++
.../apache/tez/dag/api/EntityDescriptor.java | 6 ++-
.../org/apache/tez/common/TestTezUtils.java | 57 +++++++++++++++++---
.../apache/tez/mapreduce/client/YARNRunner.java | 14 +++++
.../org/apache/tez/mapreduce/input/MRInput.java | 26 ++++++++-
.../apache/tez/mapreduce/output/MROutput.java | 6 +++
.../logging/ats/ATSHistoryLoggingService.java | 8 +--
.../library/api/TezRuntimeConfiguration.java | 8 +++
.../HadoopKeyValuesBasedBaseEdgeConfig.java | 13 +++++
.../conf/OrderedGroupedKVInputConfig.java | 9 ++++
.../conf/OrderedPartitionedKVEdgeConfig.java | 12 +++++
.../conf/OrderedPartitionedKVOutputConfig.java | 9 ++++
.../library/conf/UnorderedKVEdgeConfig.java | 13 +++++
.../library/conf/UnorderedKVInputConfig.java | 9 ++++
.../library/conf/UnorderedKVOutputConfig.java | 9 ++++
.../conf/UnorderedPartitionedKVEdgeConfig.java | 12 +++++
.../UnorderedPartitionedKVOutputConfig.java | 8 +++
.../apache/tez/runtime/library/conf/Utils.java | 45 ++++++++++++++++
.../library/input/OrderedGroupedKVInput.java | 1 +
.../runtime/library/input/UnorderedKVInput.java | 1 +
.../output/OrderedPartitionedKVOutput.java | 1 +
.../library/output/UnorderedKVOutput.java | 1 +
.../output/UnorderedPartitionedKVOutput.java | 1 +
.../TestOrderedPartitionedKVEdgeConfig.java | 45 ++++++++++++++--
.../library/conf/TestUnorderedKVEdgeConfig.java | 48 +++++++++++++++--
.../TestUnorderedPartitionedKVEdgeConfig.java | 44 +++++++++++++--
.../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 10 +++-
29 files changed, 430 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 80263bf..aa7d74d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ ALL CHANGES:
TEZ-1733. TezMerger should sort FileChunks on size when merging
TEZ-1738. Tez tfile parser for log parsing
TEZ-1627. Remove OUTPUT_CONSUMABLE and related Event in TaskAttemptImpl
+ TEZ-1736. Add support for Inputs/Outputs in runtime-library to generate history text data.
Release 0.5.3: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 58761d5..7b47b9c 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -50,7 +50,6 @@ public class ATSConstants {
public static final String APP_SUBMIT_TIME = "appSubmitTime";
/* Tez-specific info */
- public static final String CONFIG = "config";
public static final String DAG_PLAN = "dagPlan";
public static final String DAG_NAME = "dagName";
public static final String VERTEX_NAME = "vertexName";
@@ -99,4 +98,9 @@ public class ATSConstants {
"yarn.timeline-service.webapp.address";
public static final String TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS_CONF_NAME =
"yarn.timeline-service.webapp.https.address";
+
+ /* History text related Keys */
+ public static final String DESCRIPTION = "desc";
+ public static final String CONFIG = "config";
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
index c008f35..fb2ed78 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
@@ -23,17 +23,24 @@ import java.io.OutputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
/**
@@ -43,6 +50,7 @@ import org.apache.tez.dag.api.records.DAGProtos;
@InterfaceAudience.Public
public class TezUtils {
+ private static final Log LOG = LogFactory.getLog(TezUtils.class);
/**
* Allows changing the log level for task / AM logging. </p>
@@ -148,4 +156,32 @@ public class TezUtils {
conf.set(setting.getKey(), setting.getValue());
}
}
+
+ public static String convertToHistoryText(String description, Configuration conf) {
+ // Add a version if this serialization is changed
+ JSONObject jsonObject = new JSONObject();
+ try {
+ if (description != null && !description.isEmpty()) {
+ jsonObject.put(ATSConstants.DESCRIPTION, description);
+ }
+ if (conf != null) {
+ JSONObject confJson = new JSONObject();
+ Iterator<Entry<String, String>> iter = conf.iterator();
+ while (iter.hasNext()) {
+ Entry<String, String> entry = iter.next();
+ confJson.put(entry.getKey(), entry.getValue());
+ }
+ jsonObject.put(ATSConstants.CONFIG, confJson);
+ }
+ } catch (JSONException e) {
+ throw new TezUncheckedException("Error when trying to convert description/conf to JSON", e);
+ }
+ return jsonObject.toString();
+ }
+
+ public static String convertToHistoryText(Configuration conf) {
+ return convertToHistoryText(null, conf);
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java
index 2caf819..d02bddd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java
@@ -70,8 +70,12 @@ public abstract class EntityDescriptor<T extends EntityDescriptor<T>> implements
/**
* Provide a human-readable version of the user payload that can be
- * used in the History UI
+ * used in the TEZ UI
* @param historyText History text
+ * For better support in the UI, the history text should be a json-encoded string.
+ * The following keys in the json object will be recognized:
+ * "desc" : A string-value describing the entity
+ * "config" : A key-value map to represent configuration
* @return this object for further chained method calls
*/
public T setHistoryText(String historyText) {
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
index e1036a5..99408f8 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -23,13 +23,16 @@ import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.UserPayload;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import com.google.protobuf.ByteString;
public class TestTezUtils {
- @Test
+
+ @Test (timeout=2000)
public void testByteStringToAndFromConf() throws IOException {
Configuration conf = getConf();
Assert.assertEquals(conf.size(), 6);
@@ -41,7 +44,7 @@ public class TestTezUtils {
checkConf(conf);
}
- @Test
+ @Test (timeout=2000)
public void testPayloadToAndFromConf() throws IOException {
Configuration conf = getConf();
Assert.assertEquals(conf.size(), 6);
@@ -52,8 +55,8 @@ public class TestTezUtils {
Assert.assertEquals(conf.size(), 6);
checkConf(conf);
}
-
- @Test
+
+ @Test (timeout=2000)
public void testCleanVertexName() {
String testString = "special characters & spaces and longer than "
+ TezUtilsInternal.MAX_VERTEX_NAME_LENGTH + " characters";
@@ -64,7 +67,7 @@ public class TestTezUtils {
Assert.assertTrue(cleaned.matches("\\w+"));
}
- @Test
+ @Test (timeout=2000)
public void testBitSetToByteArray() {
BitSet bitSet = createBitSet(0);
byte[] bytes = TezUtilsInternal.toByteArray(bitSet);
@@ -75,7 +78,7 @@ public class TestTezUtils {
Assert.assertTrue(bytes.length == ((bitSet.length() / 8) + 1));
}
- @Test
+ @Test (timeout=2000)
public void testBitSetFromByteArray() {
BitSet bitSet = createBitSet(0);
byte[] bytes = TezUtilsInternal.toByteArray(bitSet);
@@ -93,7 +96,7 @@ public class TestTezUtils {
Assert.assertTrue(TezUtilsInternal.fromByteArray(bytes).equals(bitSet));
}
- @Test
+ @Test (timeout=2000)
public void testBitSetConversion() {
for (int i = 0 ; i < 16 ; i++) {
BitSet bitSet = createBitSetWithSingleEntry(i);
@@ -146,4 +149,44 @@ public class TestTezUtils {
Assert.assertEquals(tmp[2], "S3");
}
+
+ @Test (timeout=2000)
+ public void testConvertToHistoryText() throws JSONException {
+ Configuration conf = getConf();
+
+ String confToJson = TezUtils.convertToHistoryText(conf);
+
+ JSONObject jsonObject = new JSONObject(confToJson);
+
+ Assert.assertFalse(jsonObject.has(ATSConstants.DESCRIPTION));
+ Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
+
+ JSONObject confObject = jsonObject.getJSONObject(ATSConstants.CONFIG);
+ Assert.assertNotNull(confObject);
+ Assert.assertEquals("value1", confObject.getString("test1"));
+ Assert.assertEquals("true", confObject.getString("test2"));
+ Assert.assertEquals("1.2345", confObject.getString("test3"));
+ Assert.assertEquals("34567", confObject.getString("test4"));
+ Assert.assertEquals("1234567890", confObject.getString("test5"));
+ Assert.assertEquals("S1,S2,S3", confObject.getString("test6"));
+
+ String desc = "desc123";
+ confToJson = TezUtils.convertToHistoryText(desc, conf);
+ jsonObject = new JSONObject(confToJson);
+
+ Assert.assertTrue(jsonObject.has(ATSConstants.DESCRIPTION));
+ String descFromJson = jsonObject.getString(ATSConstants.DESCRIPTION);
+ Assert.assertEquals(desc, descFromJson);
+
+ Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
+ confObject = jsonObject.getJSONObject("config");
+ Assert.assertNotNull(confObject);
+ Assert.assertEquals("value1", confObject.getString("test1"));
+ Assert.assertEquals("true", confObject.getString("test2"));
+ Assert.assertEquals("1.2345", confObject.getString("test3"));
+ Assert.assertEquals("34567", confObject.getString("test4"));
+ Assert.assertEquals("1234567890", confObject.getString("test5"));
+ Assert.assertEquals("S1,S2,S3", confObject.getString("test6"));
+
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index dfbf0cf..1cba105 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -420,6 +420,11 @@ public class YARNRunner implements ClientProtocol {
Vertex vertex = Vertex.create(vertexName,
ProcessorDescriptor.create(processorName).setUserPayload(vertexUserPayload),
numTasks, taskResource);
+ if (stageConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+ vertex.getProcessorDescriptor().setHistoryText(TezUtils.convertToHistoryText(stageConf));
+ }
+
if (isMap) {
vertex.addDataSource("MRInput",
configureMRInputWithLegacySplitsGenerated(stageConf, true));
@@ -428,6 +433,10 @@ public class YARNRunner implements ClientProtocol {
if (stageNum == totalStages -1) {
OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
.setUserPayload(vertexUserPayload);
+ if (stageConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+ od.setHistoryText(TezUtils.convertToHistoryText(stageConf));
+ }
vertex.addDataSink("MROutput", DataSinkDescriptor.create(od,
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null));
}
@@ -806,6 +815,11 @@ public class YARNRunner implements ClientProtocol {
}
DataSourceDescriptor dsd = DataSourceDescriptor.create(inputDescriptor, null, null);
+ if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+ dsd.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
+ }
+
return dsd;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index f38fc9c..5c89f0e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.security.Credentials;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
@@ -65,6 +66,7 @@ import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -250,16 +252,23 @@ public class MRInput extends MRInputBase {
}
MRHelpers.translateMRConfToTez(conf);
- UserPayload payload = MRInputHelpersInternal.createMRInputPayload(conf, inputSplitInfo.getSplitsProto());
+ UserPayload payload = MRInputHelpersInternal.createMRInputPayload(conf,
+ inputSplitInfo.getSplitsProto());
Credentials credentials = null;
if (getCredentialsForSourceFilesystem && inputSplitInfo.getCredentials() != null) {
credentials = inputSplitInfo.getCredentials();
}
- return DataSourceDescriptor.create(
+ DataSourceDescriptor ds = DataSourceDescriptor.create(
InputDescriptor.create(inputClassName).setUserPayload(payload),
InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()),
inputSplitInfo.getNumTasks(), credentials,
VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()), null);
+ if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+ ds.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
+ }
+
+ return ds;
}
private DataSourceDescriptor createCustomDataSource() throws IOException {
@@ -279,6 +288,12 @@ public class MRInput extends MRInputBase {
DataSourceDescriptor ds = DataSourceDescriptor
.create(InputDescriptor.create(inputClassName).setUserPayload(payload),
customInitializerDescriptor, null);
+
+ if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+ ds.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
+ }
+
if (uris != null) {
ds.addURIsForCredentials(uris);
}
@@ -297,9 +312,16 @@ public class MRInput extends MRInputBase {
} else {
payload = MRInputHelpersInternal.createMRInputPayload(conf, null);
}
+
DataSourceDescriptor ds = DataSourceDescriptor.create(
InputDescriptor.create(inputClassName).setUserPayload(payload),
InputInitializerDescriptor.create(MRInputAMSplitGenerator.class.getName()), null);
+
+ if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+ ds.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
+ }
+
if (uris != null) {
ds.addURIsForCredentials(uris);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 421fc8c..94a3c1f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -66,6 +66,7 @@ import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.Output;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
/**
* {@link MROutput} is an {@link Output} which allows key/values pairs
@@ -183,6 +184,11 @@ public class MROutput extends AbstractLogicalOutput {
OutputDescriptor.create(outputClassName).setUserPayload(createUserPayload()),
(doCommit ? OutputCommitterDescriptor.create(
MROutputCommitter.class.getName()) : null), null);
+ if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+ ds.getOutputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
+ }
+
if (uris != null) {
ds.addURIsForCredentials(uris);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index 0108c26..99cb441 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -103,9 +103,11 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
// Log the size of the event-queue every so often.
if (eventCounter != 0 && eventCounter % 1000 == 0) {
- LOG.info("Event queue stats"
- + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
- + ", eventQueueSize=" + eventQueue.size());
+ if (eventsProcessed != 0 && !events.isEmpty()) {
+ LOG.info("Event queue stats"
+ + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
+ + ", eventQueueSize=" + eventQueue.size());
+ }
eventCounter = 0;
eventsProcessed = 0;
} else {
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index cb61109..3c0f11c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -294,6 +294,13 @@ public class TezRuntimeConfiguration {
// TODO TEZ-1233 - allow this property to be set per vertex
// TODO TEZ-1231 - move these properties out since they are not relevant for Inputs / Outputs
+ /**
+ * Value: Boolean
+ * Whether to publish configuration information to History logger. Default false.
+ */
+ public static final String TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT =
+ TEZ_RUNTIME_PREFIX + "convert.user-payload.to.history-text";
+ public static final boolean TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT = false;
@Unstable
@Private
@@ -345,6 +352,7 @@ public class TezRuntimeConfiguration {
tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS);
tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
+ tezRuntimeKeys.add(TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
defaultConf.addResource("core-default.xml");
defaultConf.addResource("core-site.xml");
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfig.java
index 31eb686..0692bac 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfig.java
@@ -46,6 +46,19 @@ abstract class HadoopKeyValuesBasedBaseEdgeConfig {
*/
public abstract UserPayload getInputPayload();
+
+ /**
+ * Get the history text for the configured Output
+ * @return output configuration as a string in json format
+ */
+ public abstract String getOutputHistoryText();
+
+ /**
+ * Get the history text for the configured Input
+ * @return input configuration as a string in json format
+ */
+ public abstract String getInputHistoryText();
+
/**
* Get the input class name
* @return the input class name
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfig.java
index 0e28399..888f61a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfig.java
@@ -259,6 +259,15 @@ public class OrderedGroupedKVInputConfig {
}
}
+ @InterfaceAudience.Private
+ String toHistoryText() {
+ if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+ return TezUtils.convertToHistoryText(conf);
+ }
+ return null;
+ }
+
public String getInputClassName() {
return inputClassName;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
index 0f10cf1..e6cc2c2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
@@ -104,6 +104,16 @@ public class OrderedPartitionedKVEdgeConfig
}
@Override
+ public String getOutputHistoryText() {
+ return outputConf.toHistoryText();
+ }
+
+ @Override
+ public String getInputHistoryText() {
+ return inputConf.toHistoryText();
+ }
+
+ @Override
public String getInputClassName() {
return inputConf.getInputClassName();
}
@@ -123,6 +133,7 @@ public class OrderedPartitionedKVEdgeConfig
getOutputClassName()).setUserPayload(getOutputPayload()),
InputDescriptor.create(
getInputClassName()).setUserPayload(getInputPayload()));
+ Utils.setEdgePropertyHistoryText(this, edgeProperty);
return edgeProperty;
}
@@ -140,6 +151,7 @@ public class OrderedPartitionedKVEdgeConfig
EdgeProperty.SchedulingType.SEQUENTIAL,
OutputDescriptor.create(getOutputClassName()).setUserPayload(getOutputPayload()),
InputDescriptor.create(getInputClassName()).setUserPayload(getInputPayload()));
+ Utils.setEdgePropertyHistoryText(this, edgeProperty);
return edgeProperty;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
index 1c4cca0..5437620 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
@@ -189,6 +189,15 @@ public class OrderedPartitionedKVOutputConfig {
}
}
+ @InterfaceAudience.Private
+ String toHistoryText() {
+ if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+ return TezUtils.convertToHistoryText(conf);
+ }
+ return null;
+ }
+
public static Builder newBuilder(String keyClass, String valueClass, String partitionerClassName) {
return newBuilder(keyClass, valueClass, partitionerClassName, null);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
index 6eb1d6a..28a7503 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
@@ -83,6 +83,16 @@ public class UnorderedKVEdgeConfig extends HadoopKeyValuesBasedBaseEdgeConfig {
}
@Override
+ public String getOutputHistoryText() {
+ return outputConf.toHistoryText();
+ }
+
+ @Override
+ public String getInputHistoryText() {
+ return inputConf.toHistoryText();
+ }
+
+ @Override
public String getInputClassName() {
return UnorderedKVInput.class.getName();
}
@@ -104,6 +114,7 @@ public class UnorderedKVEdgeConfig extends HadoopKeyValuesBasedBaseEdgeConfig {
getOutputClassName()).setUserPayload(getOutputPayload()),
InputDescriptor.create(
getInputClassName()).setUserPayload(getInputPayload()));
+ Utils.setEdgePropertyHistoryText(this, edgeProperty);
return edgeProperty;
}
@@ -124,6 +135,7 @@ public class UnorderedKVEdgeConfig extends HadoopKeyValuesBasedBaseEdgeConfig {
getOutputClassName()).setUserPayload(getOutputPayload()),
InputDescriptor.create(
getInputClassName()).setUserPayload(getInputPayload()));
+ Utils.setEdgePropertyHistoryText(this, edgeProperty);
return edgeProperty;
}
@@ -141,6 +153,7 @@ public class UnorderedKVEdgeConfig extends HadoopKeyValuesBasedBaseEdgeConfig {
EdgeProperty.SchedulingType.SEQUENTIAL,
OutputDescriptor.create(getOutputClassName()).setUserPayload(getOutputPayload()),
InputDescriptor.create(getInputClassName()).setUserPayload(getInputPayload()));
+ Utils.setEdgePropertyHistoryText(this, edgeProperty);
return edgeProperty;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfig.java
index bd3750e..e99b0bf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfig.java
@@ -179,6 +179,15 @@ public class UnorderedKVInputConfig {
}
}
+ @InterfaceAudience.Private
+ String toHistoryText() {
+ if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+ return TezUtils.convertToHistoryText(conf);
+ }
+ return null;
+ }
+
public static Builder newBuilder(String keyClass, String valueClass) {
return new Builder(keyClass, valueClass);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfig.java
index fdeffb3..30df2e3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfig.java
@@ -125,6 +125,15 @@ public class UnorderedKVOutputConfig {
}
}
+ @InterfaceAudience.Private
+ String toHistoryText() {
+ if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+ return TezUtils.convertToHistoryText(conf);
+ }
+ return null;
+ }
+
public static Builder newBuilder(String keyClass, String valClass) {
return new Builder(keyClass, valClass);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
index 30585bd..8ac7d65 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
@@ -108,6 +108,16 @@ public class UnorderedPartitionedKVEdgeConfig
}
@Override
+ public String getOutputHistoryText() {
+ return outputConf.toHistoryText();
+ }
+
+ @Override
+ public String getInputHistoryText() {
+ return inputConf.toHistoryText();
+ }
+
+ @Override
public String getInputClassName() {
return UnorderedKVInput.class.getName();
}
@@ -129,6 +139,7 @@ public class UnorderedPartitionedKVEdgeConfig
getOutputClassName()).setUserPayload(getOutputPayload()),
InputDescriptor.create(
getInputClassName()).setUserPayload(getInputPayload()));
+ Utils.setEdgePropertyHistoryText(this, edgeProperty);
return edgeProperty;
}
@@ -146,6 +157,7 @@ public class UnorderedPartitionedKVEdgeConfig
EdgeProperty.SchedulingType.SEQUENTIAL,
OutputDescriptor.create(getOutputClassName()).setUserPayload(getOutputPayload()),
InputDescriptor.create(getInputClassName()).setUserPayload(getInputPayload()));
+ Utils.setEdgePropertyHistoryText(this, edgeProperty);
return edgeProperty;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfig.java
index fae9d0f..0c8cd0d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfig.java
@@ -139,6 +139,14 @@ public class UnorderedPartitionedKVOutputConfig {
}
}
+ @InterfaceAudience.Private
+ String toHistoryText() {
+ if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+ return TezUtils.convertToHistoryText(conf);
+ }
+ return null;
+ }
public static Builder newBuilder(String keyClass, String valClass, String partitionerClassName) {
return newBuilder(keyClass, valClass, partitionerClassName, null);
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/Utils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/Utils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/Utils.java
new file mode 100644
index 0000000..c1b44a2
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/Utils.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.dag.api.EdgeProperty;
+
+@Private
+class Utils {
+
+ /**
+ * Modify the EdgeProperty to set the history text if available
+ * @param edgeConfig Edge config
+ * @param edgeProperty Edge property to be be modified
+ */
+ static void setEdgePropertyHistoryText(HadoopKeyValuesBasedBaseEdgeConfig edgeConfig,
+ EdgeProperty edgeProperty) {
+ String inputHistoryText = edgeConfig.getInputHistoryText();
+ if (inputHistoryText != null) {
+ edgeProperty.getEdgeDestination().setHistoryText(inputHistoryText);
+ }
+ String outputHistoryText = edgeConfig.getOutputHistoryText();
+ if (outputHistoryText != null) {
+ edgeProperty.getEdgeSource().setHistoryText(outputHistoryText);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 4231442..f46f8f7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -334,6 +334,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 368c988..0d02cb3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -249,6 +249,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 70d5b78..b3290a5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -236,6 +236,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 6a84e61..7bfc397 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -179,6 +179,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 11e6849..1e39535 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -119,6 +119,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
index c595a9d..35fd3cb 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
@@ -21,9 +21,11 @@
package org.apache.tez.runtime.library.conf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
import java.util.HashMap;
import java.util.Map;
@@ -31,6 +33,8 @@ import java.util.Map;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.junit.Test;
@@ -60,7 +64,7 @@ public class TestOrderedPartitionedKVEdgeConfig {
}
}
- @Test
+ @Test (timeout=2000)
public void testDefaultConfigsUsed() {
OrderedPartitionedKVEdgeConfig.Builder builder = OrderedPartitionedKVEdgeConfig
.newBuilder("KEY", "VALUE", "PARTITIONER");
@@ -86,7 +90,7 @@ public class TestOrderedPartitionedKVEdgeConfig {
inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
}
- @Test
+ @Test (timeout=2000)
public void testSpecificIOConfs() {
// Ensures that Output and Input confs are not mixed.
OrderedPartitionedKVEdgeConfig.Builder builder = OrderedPartitionedKVEdgeConfig
@@ -109,7 +113,7 @@ public class TestOrderedPartitionedKVEdgeConfig {
inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "DEFAULT"));
}
- @Test
+ @Test (timeout=2000)
public void tetCommonConf() {
Configuration fromConf = new Configuration(false);
@@ -179,7 +183,7 @@ public class TestOrderedPartitionedKVEdgeConfig {
}
- @Test
+ @Test (timeout=2000)
public void testSetters() {
Map<String, String> comparatorConf = Maps.newHashMap();
comparatorConf.put("comparator.test.key", "comparatorValue");
@@ -244,7 +248,7 @@ public class TestOrderedPartitionedKVEdgeConfig {
}
- @Test
+ @Test (timeout=2000)
public void testSerialization() {
OrderedPartitionedKVEdgeConfig.Builder builder = OrderedPartitionedKVEdgeConfig
.newBuilder("KEY", "VALUE", "PARTITIONER")
@@ -294,4 +298,35 @@ public class TestOrderedPartitionedKVEdgeConfig {
inputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS,
false));
}
+
+ private void checkHistoryText(String historyText) {
+ assertNotNull(historyText);
+ assertTrue(historyText.contains(
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT));
+ }
+
+ @Test (timeout=2000)
+ public void testHistoryText() {
+ OrderedPartitionedKVEdgeConfig.Builder builder =
+ OrderedPartitionedKVEdgeConfig.newBuilder("KEY", "VALUE", "PARTITIONER");
+ Configuration fromConf = new Configuration(false);
+ fromConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ true);
+ builder.setFromConfiguration(fromConf);
+
+ OrderedPartitionedKVEdgeConfig kvEdgeConfig = builder.build();
+ checkHistoryText(kvEdgeConfig.getInputHistoryText());
+ checkHistoryText(kvEdgeConfig.getOutputHistoryText());
+
+ EdgeProperty defaultEdgeProperty = builder.build().createDefaultEdgeProperty();
+ checkHistoryText(defaultEdgeProperty.getEdgeDestination().getHistoryText());
+ checkHistoryText(defaultEdgeProperty.getEdgeSource().getHistoryText());
+
+ EdgeManagerPluginDescriptor descriptor = mock(EdgeManagerPluginDescriptor.class);
+ EdgeProperty edgeProperty = builder.build().createDefaultCustomEdgeProperty(descriptor);
+ checkHistoryText(edgeProperty.getEdgeDestination().getHistoryText());
+ checkHistoryText(edgeProperty.getEdgeSource().getHistoryText());
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedKVEdgeConfig.java
index fab1e94..0ef1cc5 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedKVEdgeConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedKVEdgeConfig.java
@@ -21,21 +21,25 @@
package org.apache.tez.runtime.library.conf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.junit.Test;
public class TestUnorderedKVEdgeConfig {
- @Test
+ @Test (timeout=2000)
public void testNullParams() {
try {
UnorderedKVEdgeConfig.newBuilder(null, "VALUE");
@@ -52,7 +56,7 @@ public class TestUnorderedKVEdgeConfig {
}
}
- @Test
+ @Test (timeout=2000)
public void testDefaultConfigsUsed() {
UnorderedKVEdgeConfig.Builder builder =
UnorderedKVEdgeConfig.newBuilder("KEY", "VALUE");
@@ -85,7 +89,7 @@ public class TestUnorderedKVEdgeConfig {
("SerClass2,SerClass1"));
}
- @Test
+ @Test (timeout=2000)
public void testSpecificIOConfs() {
// Ensures that Output and Input confs are not mixed.
UnorderedKVEdgeConfig.Builder builder =
@@ -109,7 +113,7 @@ public class TestUnorderedKVEdgeConfig {
inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "DEFAULT"));
}
- @Test
+ @Test (timeout=2000)
public void tetCommonConf() {
Configuration fromConf = new Configuration(false);
@@ -170,4 +174,40 @@ public class TestUnorderedKVEdgeConfig {
assertEquals("fs", inputConf.get("fs.shouldExist"));
}
+
+ private void checkHistoryText(String historyText) {
+ assertNotNull(historyText);
+ assertTrue(historyText.contains(
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT));
+ }
+
+ @Test (timeout=2000)
+ public void testHistoryText() {
+ UnorderedKVEdgeConfig.Builder builder = UnorderedKVEdgeConfig.newBuilder("KEY", "VALUE");
+ Configuration fromConf = new Configuration(false);
+ fromConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ true);
+ builder.setFromConfiguration(fromConf);
+
+ UnorderedKVEdgeConfig kvEdgeConfig = builder.build();
+
+ checkHistoryText(kvEdgeConfig.getInputHistoryText());
+ checkHistoryText(kvEdgeConfig.getOutputHistoryText());
+
+ EdgeProperty defaultEdgeProperty = builder.build().createDefaultBroadcastEdgeProperty();
+ checkHistoryText(defaultEdgeProperty.getEdgeDestination().getHistoryText());
+ checkHistoryText(defaultEdgeProperty.getEdgeSource().getHistoryText());
+
+ defaultEdgeProperty = builder.build().createDefaultOneToOneEdgeProperty();
+ checkHistoryText(defaultEdgeProperty.getEdgeDestination().getHistoryText());
+ checkHistoryText(defaultEdgeProperty.getEdgeSource().getHistoryText());
+
+ EdgeManagerPluginDescriptor descriptor = mock(EdgeManagerPluginDescriptor.class);
+ EdgeProperty edgeProperty = builder.build().createDefaultCustomEdgeProperty(descriptor);
+ checkHistoryText(edgeProperty.getEdgeDestination().getHistoryText());
+ checkHistoryText(edgeProperty.getEdgeSource().getHistoryText());
+
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfig.java
index b8b86e5..fff8efd 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfig.java
@@ -21,21 +21,25 @@
package org.apache.tez.runtime.library.conf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.junit.Test;
public class TestUnorderedPartitionedKVEdgeConfig {
- @Test
+ @Test (timeout=2000)
public void testNullParams() {
try {
UnorderedPartitionedKVEdgeConfig.newBuilder(null, "VALUE", "PARTITIONER");
@@ -59,7 +63,7 @@ public class TestUnorderedPartitionedKVEdgeConfig {
}
}
- @Test
+ @Test (timeout=2000)
public void testDefaultConfigsUsed() {
UnorderedPartitionedKVEdgeConfig.Builder builder =
UnorderedPartitionedKVEdgeConfig.newBuilder("KEY", "VALUE", "PARTITIONER");
@@ -92,7 +96,7 @@ public class TestUnorderedPartitionedKVEdgeConfig {
("SerClass2,SerClass1"));
}
- @Test
+ @Test (timeout=2000)
public void testSpecificIOConfs() {
// Ensures that Output and Input confs are not mixed.
UnorderedPartitionedKVEdgeConfig.Builder builder =
@@ -116,7 +120,7 @@ public class TestUnorderedPartitionedKVEdgeConfig {
inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "DEFAULT"));
}
- @Test
+ @Test (timeout=2000)
public void tetCommonConf() {
Configuration fromConf = new Configuration(false);
@@ -185,4 +189,36 @@ public class TestUnorderedPartitionedKVEdgeConfig {
assertEquals("fs", inputConf.get("fs.shouldExist"));
}
+
+ private void checkHistoryText(String historyText) {
+ assertNotNull(historyText);
+ assertTrue(historyText.contains(
+ TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT));
+ }
+
+ @Test (timeout=2000)
+ public void testHistoryText() {
+ UnorderedPartitionedKVEdgeConfig.Builder builder =
+ UnorderedPartitionedKVEdgeConfig.newBuilder("KEY", "VALUE", "PARTITIONER");
+ Configuration fromConf = new Configuration(false);
+ fromConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ true);
+ builder.setFromConfiguration(fromConf);
+
+ UnorderedPartitionedKVEdgeConfig kvEdgeConfig = builder.build();
+
+ checkHistoryText(kvEdgeConfig.getInputHistoryText());
+ checkHistoryText(kvEdgeConfig.getOutputHistoryText());
+
+ EdgeProperty defaultEdgeProperty = builder.build().createDefaultEdgeProperty();
+ checkHistoryText(defaultEdgeProperty.getEdgeDestination().getHistoryText());
+ checkHistoryText(defaultEdgeProperty.getEdgeSource().getHistoryText());
+
+ EdgeManagerPluginDescriptor descriptor = mock(EdgeManagerPluginDescriptor.class);
+ EdgeProperty edgeProperty = builder.build().createDefaultCustomEdgeProperty(descriptor);
+ checkHistoryText(edgeProperty.getEdgeDestination().getHistoryText());
+ checkHistoryText(edgeProperty.getEdgeSource().getHistoryText());
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 9ca2762..86dcc64 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -77,6 +77,7 @@ import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
@@ -118,6 +119,7 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
import org.apache.tez.runtime.library.processor.SleepProcessor;
@@ -691,8 +693,12 @@ public class TestMRRJobsDAGApi {
Vertex stage3Vertex = Vertex.create("reduce", ProcessorDescriptor.create(
ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
1, Resource.newInstance(256, 1));
- stage3Vertex.addDataSink("MROutput",
- MROutputLegacy.createConfigBuilder(stage3Conf, NullOutputFormat.class).build());
+ stage3Conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+ true);
+ DataSinkDescriptor dataSinkDescriptor =
+ MROutputLegacy.createConfigBuilder(stage3Conf, NullOutputFormat.class).build();
+ Assert.assertFalse(dataSinkDescriptor.getOutputDescriptor().getHistoryText().isEmpty());
+ stage3Vertex.addDataSink("MROutput", dataSinkDescriptor);
// TODO env, resources