You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:53 UTC

[44/50] [abbrv] tez git commit: TEZ-1736. Add support for Inputs/Outputs in runtime-library to generate history text data. (hitesh)

TEZ-1736. Add support for Inputs/Outputs in runtime-library to generate history text data. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0daf2ba1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0daf2ba1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0daf2ba1

Branch: refs/heads/TEZ-8
Commit: 0daf2ba188416f69198e23621d05f3dd3acd2882
Parents: 0cceb1f
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Nov 12 14:28:10 2014 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Nov 12 14:28:10 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/common/ATSConstants.java     |  6 ++-
 .../java/org/apache/tez/common/TezUtils.java    | 36 +++++++++++++
 .../apache/tez/dag/api/EntityDescriptor.java    |  6 ++-
 .../org/apache/tez/common/TestTezUtils.java     | 57 +++++++++++++++++---
 .../apache/tez/mapreduce/client/YARNRunner.java | 14 +++++
 .../org/apache/tez/mapreduce/input/MRInput.java | 26 ++++++++-
 .../apache/tez/mapreduce/output/MROutput.java   |  6 +++
 .../logging/ats/ATSHistoryLoggingService.java   |  8 +--
 .../library/api/TezRuntimeConfiguration.java    |  8 +++
 .../HadoopKeyValuesBasedBaseEdgeConfig.java     | 13 +++++
 .../conf/OrderedGroupedKVInputConfig.java       |  9 ++++
 .../conf/OrderedPartitionedKVEdgeConfig.java    | 12 +++++
 .../conf/OrderedPartitionedKVOutputConfig.java  |  9 ++++
 .../library/conf/UnorderedKVEdgeConfig.java     | 13 +++++
 .../library/conf/UnorderedKVInputConfig.java    |  9 ++++
 .../library/conf/UnorderedKVOutputConfig.java   |  9 ++++
 .../conf/UnorderedPartitionedKVEdgeConfig.java  | 12 +++++
 .../UnorderedPartitionedKVOutputConfig.java     |  8 +++
 .../apache/tez/runtime/library/conf/Utils.java  | 45 ++++++++++++++++
 .../library/input/OrderedGroupedKVInput.java    |  1 +
 .../runtime/library/input/UnorderedKVInput.java |  1 +
 .../output/OrderedPartitionedKVOutput.java      |  1 +
 .../library/output/UnorderedKVOutput.java       |  1 +
 .../output/UnorderedPartitionedKVOutput.java    |  1 +
 .../TestOrderedPartitionedKVEdgeConfig.java     | 45 ++++++++++++++--
 .../library/conf/TestUnorderedKVEdgeConfig.java | 48 +++++++++++++++--
 .../TestUnorderedPartitionedKVEdgeConfig.java   | 44 +++++++++++++--
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 10 +++-
 29 files changed, 430 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 80263bf..aa7d74d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ ALL CHANGES:
   TEZ-1733. TezMerger should sort FileChunks on size when merging
   TEZ-1738. Tez tfile parser for log parsing
   TEZ-1627. Remove OUTPUT_CONSUMABLE and related Event in TaskAttemptImpl
+  TEZ-1736. Add support for Inputs/Outputs in runtime-library to generate history text data.
 
 Release 0.5.3: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 58761d5..7b47b9c 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -50,7 +50,6 @@ public class ATSConstants {
   public static final String APP_SUBMIT_TIME = "appSubmitTime";
 
   /* Tez-specific info */
-  public static final String CONFIG = "config";
   public static final String DAG_PLAN = "dagPlan";
   public static final String DAG_NAME = "dagName";
   public static final String VERTEX_NAME = "vertexName";
@@ -99,4 +98,9 @@ public class ATSConstants {
       "yarn.timeline-service.webapp.address";
   public static final String TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS_CONF_NAME =
       "yarn.timeline-service.webapp.https.address";
+
+  /* History text related Keys */
+  public static final String DESCRIPTION = "desc";
+  public static final String CONFIG = "config";
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
index c008f35..fb2ed78 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
@@ -23,17 +23,24 @@ import java.io.OutputStream;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
 
 
 /**
@@ -43,6 +50,7 @@ import org.apache.tez.dag.api.records.DAGProtos;
 @InterfaceAudience.Public
 public class TezUtils {
 
+  private static final Log LOG = LogFactory.getLog(TezUtils.class);
 
   /**
    * Allows changing the log level for task / AM logging. </p>
@@ -148,4 +156,32 @@ public class TezUtils {
       conf.set(setting.getKey(), setting.getValue());
     }
   }
+
+  public static String convertToHistoryText(String description, Configuration conf) {
+    // Add a version if this serialization is changed
+    JSONObject jsonObject = new JSONObject();
+    try {
+      if (description != null && !description.isEmpty()) {
+        jsonObject.put(ATSConstants.DESCRIPTION, description);
+      }
+      if (conf != null) {
+        JSONObject confJson = new JSONObject();
+        Iterator<Entry<String, String>> iter = conf.iterator();
+        while (iter.hasNext()) {
+          Entry<String, String> entry = iter.next();
+          confJson.put(entry.getKey(), entry.getValue());
+        }
+        jsonObject.put(ATSConstants.CONFIG, confJson);
+      }
+    } catch (JSONException e) {
+      throw new TezUncheckedException("Error when trying to convert description/conf to JSON", e);
+    }
+    return jsonObject.toString();
+  }
+
+  public static String convertToHistoryText(Configuration conf) {
+    return convertToHistoryText(null, conf);
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java
index 2caf819..d02bddd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java
@@ -70,8 +70,12 @@ public abstract class EntityDescriptor<T extends EntityDescriptor<T>> implements
 
   /**
    * Provide a human-readable version of the user payload that can be
-   * used in the History UI
+   * used in the TEZ UI
    * @param historyText History text
+   * For better support in the UI, the history text should be a json-encoded string.
+   * The following keys in the json object will be recognized:
+   *    "desc" : A string-value describing the entity
+   *    "config" : A key-value map to represent configuration
    * @return this object for further chained method calls
    */
   public T setHistoryText(String historyText) {

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
index e1036a5..99408f8 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -23,13 +23,16 @@ import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.UserPayload;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.protobuf.ByteString;
 
 public class TestTezUtils {
-  @Test
+
+  @Test (timeout=2000)
   public void testByteStringToAndFromConf() throws IOException {
     Configuration conf = getConf();
     Assert.assertEquals(conf.size(), 6);
@@ -41,7 +44,7 @@ public class TestTezUtils {
     checkConf(conf);
   }
 
-  @Test
+  @Test (timeout=2000)
   public void testPayloadToAndFromConf() throws IOException {
     Configuration conf = getConf();
     Assert.assertEquals(conf.size(), 6);
@@ -52,8 +55,8 @@ public class TestTezUtils {
     Assert.assertEquals(conf.size(), 6);
     checkConf(conf);
   }
-  
-  @Test
+
+  @Test (timeout=2000)
   public void testCleanVertexName() {
     String testString = "special characters & spaces and longer than "
         + TezUtilsInternal.MAX_VERTEX_NAME_LENGTH + " characters";
@@ -64,7 +67,7 @@ public class TestTezUtils {
     Assert.assertTrue(cleaned.matches("\\w+"));
   }
 
-  @Test
+  @Test (timeout=2000)
   public void testBitSetToByteArray() {
     BitSet bitSet = createBitSet(0);
     byte[] bytes = TezUtilsInternal.toByteArray(bitSet);
@@ -75,7 +78,7 @@ public class TestTezUtils {
     Assert.assertTrue(bytes.length == ((bitSet.length() / 8) + 1));
   }
 
-  @Test
+  @Test (timeout=2000)
   public void testBitSetFromByteArray() {
     BitSet bitSet = createBitSet(0);
     byte[] bytes = TezUtilsInternal.toByteArray(bitSet);
@@ -93,7 +96,7 @@ public class TestTezUtils {
     Assert.assertTrue(TezUtilsInternal.fromByteArray(bytes).equals(bitSet));
   }
 
-  @Test
+  @Test (timeout=2000)
   public void testBitSetConversion() {
     for (int i = 0 ; i < 16 ; i++) {
       BitSet bitSet = createBitSetWithSingleEntry(i);
@@ -146,4 +149,44 @@ public class TestTezUtils {
     Assert.assertEquals(tmp[2], "S3");
 
   }
+
+  @Test (timeout=2000)
+  public void testConvertToHistoryText() throws JSONException {
+    Configuration conf = getConf();
+
+    String confToJson = TezUtils.convertToHistoryText(conf);
+
+    JSONObject jsonObject = new JSONObject(confToJson);
+
+    Assert.assertFalse(jsonObject.has(ATSConstants.DESCRIPTION));
+    Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
+
+    JSONObject confObject = jsonObject.getJSONObject(ATSConstants.CONFIG);
+    Assert.assertNotNull(confObject);
+    Assert.assertEquals("value1", confObject.getString("test1"));
+    Assert.assertEquals("true", confObject.getString("test2"));
+    Assert.assertEquals("1.2345", confObject.getString("test3"));
+    Assert.assertEquals("34567", confObject.getString("test4"));
+    Assert.assertEquals("1234567890", confObject.getString("test5"));
+    Assert.assertEquals("S1,S2,S3", confObject.getString("test6"));
+
+    String desc = "desc123";
+    confToJson = TezUtils.convertToHistoryText(desc, conf);
+    jsonObject = new JSONObject(confToJson);
+
+    Assert.assertTrue(jsonObject.has(ATSConstants.DESCRIPTION));
+    String descFromJson = jsonObject.getString(ATSConstants.DESCRIPTION);
+    Assert.assertEquals(desc, descFromJson);
+
+    Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG));
+    confObject = jsonObject.getJSONObject("config");
+    Assert.assertNotNull(confObject);
+    Assert.assertEquals("value1", confObject.getString("test1"));
+    Assert.assertEquals("true", confObject.getString("test2"));
+    Assert.assertEquals("1.2345", confObject.getString("test3"));
+    Assert.assertEquals("34567", confObject.getString("test4"));
+    Assert.assertEquals("1234567890", confObject.getString("test5"));
+    Assert.assertEquals("S1,S2,S3", confObject.getString("test6"));
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index dfbf0cf..1cba105 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -420,6 +420,11 @@ public class YARNRunner implements ClientProtocol {
     Vertex vertex = Vertex.create(vertexName,
         ProcessorDescriptor.create(processorName).setUserPayload(vertexUserPayload),
         numTasks, taskResource);
+    if (stageConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+      vertex.getProcessorDescriptor().setHistoryText(TezUtils.convertToHistoryText(stageConf));
+    }
+
     if (isMap) {
       vertex.addDataSource("MRInput",
           configureMRInputWithLegacySplitsGenerated(stageConf, true));
@@ -428,6 +433,10 @@ public class YARNRunner implements ClientProtocol {
     if (stageNum == totalStages -1) {
       OutputDescriptor od = OutputDescriptor.create(MROutputLegacy.class.getName())
           .setUserPayload(vertexUserPayload);
+      if (stageConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+          TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+        od.setHistoryText(TezUtils.convertToHistoryText(stageConf));
+      }
       vertex.addDataSink("MROutput", DataSinkDescriptor.create(od,
           OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), null));
     }
@@ -806,6 +815,11 @@ public class YARNRunner implements ClientProtocol {
     }
 
     DataSourceDescriptor dsd = DataSourceDescriptor.create(inputDescriptor, null, null);
+    if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+      dsd.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
+    }
+
     return dsd;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index f38fc9c..5c89f0e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.security.Credentials;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
@@ -65,6 +66,7 @@ import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -250,16 +252,23 @@ public class MRInput extends MRInputBase {
       }
       MRHelpers.translateMRConfToTez(conf);
 
-      UserPayload payload = MRInputHelpersInternal.createMRInputPayload(conf, inputSplitInfo.getSplitsProto());
+      UserPayload payload = MRInputHelpersInternal.createMRInputPayload(conf,
+          inputSplitInfo.getSplitsProto());
       Credentials credentials = null;
       if (getCredentialsForSourceFilesystem && inputSplitInfo.getCredentials() != null) {
         credentials = inputSplitInfo.getCredentials();
       }
-      return DataSourceDescriptor.create(
+      DataSourceDescriptor ds = DataSourceDescriptor.create(
           InputDescriptor.create(inputClassName).setUserPayload(payload),
           InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()),
           inputSplitInfo.getNumTasks(), credentials,
           VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()), null);
+      if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+          TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+        ds.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
+      }
+
+      return ds;
     }
 
     private DataSourceDescriptor createCustomDataSource() throws IOException {
@@ -279,6 +288,12 @@ public class MRInput extends MRInputBase {
       DataSourceDescriptor ds = DataSourceDescriptor
           .create(InputDescriptor.create(inputClassName).setUserPayload(payload),
               customInitializerDescriptor, null);
+
+      if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+          TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+        ds.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
+      }
+
       if (uris != null) {
         ds.addURIsForCredentials(uris);
       }
@@ -297,9 +312,16 @@ public class MRInput extends MRInputBase {
       } else {
         payload = MRInputHelpersInternal.createMRInputPayload(conf, null);
       }
+
       DataSourceDescriptor ds = DataSourceDescriptor.create(
           InputDescriptor.create(inputClassName).setUserPayload(payload),
           InputInitializerDescriptor.create(MRInputAMSplitGenerator.class.getName()), null);
+
+      if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+          TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+        ds.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
+      }
+
       if (uris != null) {
         ds.addURIsForCredentials(uris);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 421fc8c..94a3c1f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -66,6 +66,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Output;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 
 /**
  * {@link MROutput} is an {@link Output} which allows key/values pairs
@@ -183,6 +184,11 @@ public class MROutput extends AbstractLogicalOutput {
           OutputDescriptor.create(outputClassName).setUserPayload(createUserPayload()),
           (doCommit ? OutputCommitterDescriptor.create(
               MROutputCommitter.class.getName()) : null), null);
+      if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+          TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+        ds.getOutputDescriptor().setHistoryText(TezUtils.convertToHistoryText(conf));
+      }
+
       if (uris != null) {
         ds.addURIsForCredentials(uris);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
index 0108c26..99cb441 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
@@ -103,9 +103,11 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
 
           // Log the size of the event-queue every so often.
           if (eventCounter != 0 && eventCounter % 1000 == 0) {
-            LOG.info("Event queue stats"
-                + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
-                + ", eventQueueSize=" + eventQueue.size());
+            if (eventsProcessed != 0 && !events.isEmpty()) {
+              LOG.info("Event queue stats"
+                  + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
+                  + ", eventQueueSize=" + eventQueue.size());
+            }
             eventCounter = 0;
             eventsProcessed = 0;
           } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index cb61109..3c0f11c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -294,6 +294,13 @@ public class TezRuntimeConfiguration {
   // TODO TEZ-1233 - allow this property to be set per vertex
   // TODO TEZ-1231 - move these properties out since they are not relevant for Inputs / Outputs
 
+  /**
+   * Value: Boolean
+   * Whether to publish configuration information to History logger. Default false.
+   */
+  public static final String TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT =
+      TEZ_RUNTIME_PREFIX + "convert.user-payload.to.history-text";
+  public static final boolean TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT = false;
 
   @Unstable
   @Private
@@ -345,6 +352,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS);
     tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
     tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
+    tezRuntimeKeys.add(TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
 
     defaultConf.addResource("core-default.xml");
     defaultConf.addResource("core-site.xml");

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfig.java
index 31eb686..0692bac 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfig.java
@@ -46,6 +46,19 @@ abstract class HadoopKeyValuesBasedBaseEdgeConfig {
    */
   public abstract UserPayload getInputPayload();
 
+
+  /**
+   * Get the history text for the configured Output
+   * @return output configuration as a string in json format
+   */
+  public abstract String getOutputHistoryText();
+
+  /**
+   * Get the history text for the configured Input
+   * @return input configuration as a string in json format
+   */
+  public abstract String getInputHistoryText();
+
   /**
    * Get the input class name
    * @return the input class name

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfig.java
index 0e28399..888f61a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfig.java
@@ -259,6 +259,15 @@ public class OrderedGroupedKVInputConfig {
     }
   }
 
+  @InterfaceAudience.Private
+  String toHistoryText() {
+    if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+      return TezUtils.convertToHistoryText(conf);
+    }
+    return null;
+  }
+
   public String getInputClassName() {
     return inputClassName;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
index 0f10cf1..e6cc2c2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfig.java
@@ -104,6 +104,16 @@ public class OrderedPartitionedKVEdgeConfig
   }
 
   @Override
+  public String getOutputHistoryText() {
+    return outputConf.toHistoryText();
+  }
+
+  @Override
+  public String getInputHistoryText() {
+    return inputConf.toHistoryText();
+  }
+
+  @Override
   public String getInputClassName() {
     return inputConf.getInputClassName();
   }
@@ -123,6 +133,7 @@ public class OrderedPartitionedKVEdgeConfig
             getOutputClassName()).setUserPayload(getOutputPayload()),
         InputDescriptor.create(
             getInputClassName()).setUserPayload(getInputPayload()));
+    Utils.setEdgePropertyHistoryText(this, edgeProperty);
     return edgeProperty;
   }
 
@@ -140,6 +151,7 @@ public class OrderedPartitionedKVEdgeConfig
             EdgeProperty.SchedulingType.SEQUENTIAL,
             OutputDescriptor.create(getOutputClassName()).setUserPayload(getOutputPayload()),
             InputDescriptor.create(getInputClassName()).setUserPayload(getInputPayload()));
+    Utils.setEdgePropertyHistoryText(this, edgeProperty);
     return edgeProperty;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
index 1c4cca0..5437620 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfig.java
@@ -189,6 +189,15 @@ public class OrderedPartitionedKVOutputConfig {
     }
   }
 
+  @InterfaceAudience.Private
+  String toHistoryText() {
+    if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+      return TezUtils.convertToHistoryText(conf);
+    }
+    return null;
+  }
+
   public static Builder newBuilder(String keyClass, String valueClass, String partitionerClassName) {
     return newBuilder(keyClass, valueClass, partitionerClassName, null);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
index 6eb1d6a..28a7503 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfig.java
@@ -83,6 +83,16 @@ public class UnorderedKVEdgeConfig extends HadoopKeyValuesBasedBaseEdgeConfig {
   }
 
   @Override
+  public String getOutputHistoryText() {
+    return outputConf.toHistoryText();
+  }
+
+  @Override
+  public String getInputHistoryText() {
+    return inputConf.toHistoryText();
+  }
+
+  @Override
   public String getInputClassName() {
     return UnorderedKVInput.class.getName();
   }
@@ -104,6 +114,7 @@ public class UnorderedKVEdgeConfig extends HadoopKeyValuesBasedBaseEdgeConfig {
             getOutputClassName()).setUserPayload(getOutputPayload()),
         InputDescriptor.create(
             getInputClassName()).setUserPayload(getInputPayload()));
+    Utils.setEdgePropertyHistoryText(this, edgeProperty);
     return edgeProperty;
   }
 
@@ -124,6 +135,7 @@ public class UnorderedKVEdgeConfig extends HadoopKeyValuesBasedBaseEdgeConfig {
             getOutputClassName()).setUserPayload(getOutputPayload()),
         InputDescriptor.create(
             getInputClassName()).setUserPayload(getInputPayload()));
+    Utils.setEdgePropertyHistoryText(this, edgeProperty);
     return edgeProperty;
   }
 
@@ -141,6 +153,7 @@ public class UnorderedKVEdgeConfig extends HadoopKeyValuesBasedBaseEdgeConfig {
             EdgeProperty.SchedulingType.SEQUENTIAL,
             OutputDescriptor.create(getOutputClassName()).setUserPayload(getOutputPayload()),
             InputDescriptor.create(getInputClassName()).setUserPayload(getInputPayload()));
+    Utils.setEdgePropertyHistoryText(this, edgeProperty);
     return edgeProperty;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfig.java
index bd3750e..e99b0bf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfig.java
@@ -179,6 +179,15 @@ public class UnorderedKVInputConfig {
     }
   }
 
+  @InterfaceAudience.Private
+  String toHistoryText() {
+    if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+      return TezUtils.convertToHistoryText(conf);
+    }
+    return null;
+  }
+
   public static Builder newBuilder(String keyClass, String valueClass) {
     return new Builder(keyClass, valueClass);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfig.java
index fdeffb3..30df2e3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfig.java
@@ -125,6 +125,15 @@ public class UnorderedKVOutputConfig {
     }
   }
 
+  @InterfaceAudience.Private
+  String toHistoryText() {
+    if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+      return TezUtils.convertToHistoryText(conf);
+    }
+    return null;
+  }
+
   public static Builder newBuilder(String keyClass, String valClass) {
     return new Builder(keyClass, valClass);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
index 30585bd..8ac7d65 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfig.java
@@ -108,6 +108,16 @@ public class UnorderedPartitionedKVEdgeConfig
   }
 
   @Override
+  public String getOutputHistoryText() {
+    return outputConf.toHistoryText();
+  }
+
+  @Override
+  public String getInputHistoryText() {
+    return inputConf.toHistoryText();
+  }
+
+  @Override
   public String getInputClassName() {
     return UnorderedKVInput.class.getName();
   }
@@ -129,6 +139,7 @@ public class UnorderedPartitionedKVEdgeConfig
             getOutputClassName()).setUserPayload(getOutputPayload()),
         InputDescriptor.create(
             getInputClassName()).setUserPayload(getInputPayload()));
+    Utils.setEdgePropertyHistoryText(this, edgeProperty);
     return edgeProperty;
   }
 
@@ -146,6 +157,7 @@ public class UnorderedPartitionedKVEdgeConfig
             EdgeProperty.SchedulingType.SEQUENTIAL,
             OutputDescriptor.create(getOutputClassName()).setUserPayload(getOutputPayload()),
             InputDescriptor.create(getInputClassName()).setUserPayload(getInputPayload()));
+    Utils.setEdgePropertyHistoryText(this, edgeProperty);
     return edgeProperty;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfig.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfig.java
index fae9d0f..0c8cd0d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfig.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfig.java
@@ -139,6 +139,14 @@ public class UnorderedPartitionedKVOutputConfig {
     }
   }
 
+  @InterfaceAudience.Private
+  String toHistoryText() {
+    if (conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT_DEFAULT)) {
+      return TezUtils.convertToHistoryText(conf);
+    }
+    return null;
+  }
 
   public static Builder newBuilder(String keyClass, String valClass, String partitionerClassName) {
     return newBuilder(keyClass, valClass, partitionerClassName, null);

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/Utils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/Utils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/Utils.java
new file mode 100644
index 0000000..c1b44a2
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/Utils.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.dag.api.EdgeProperty;
+
+@Private
+class Utils {
+
+  /**
+   * Modify the EdgeProperty to set the history text if available
+   * @param edgeConfig Edge config
+   * @param edgeProperty Edge property to be be modified
+   */
+  static void setEdgePropertyHistoryText(HadoopKeyValuesBasedBaseEdgeConfig edgeConfig,
+                             EdgeProperty edgeProperty) {
+    String inputHistoryText = edgeConfig.getInputHistoryText();
+    if (inputHistoryText != null) {
+      edgeProperty.getEdgeDestination().setHistoryText(inputHistoryText);
+    }
+    String outputHistoryText = edgeConfig.getOutputHistoryText();
+    if (outputHistoryText != null) {
+      edgeProperty.getEdgeSource().setHistoryText(outputHistoryText);
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 4231442..f46f8f7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -334,6 +334,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 368c988..0d02cb3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -249,6 +249,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 70d5b78..b3290a5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -236,6 +236,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 6a84e61..7bfc397 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -179,6 +179,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 11e6849..1e39535 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -119,6 +119,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
index c595a9d..35fd3cb 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java
@@ -21,9 +21,11 @@
 package org.apache.tez.runtime.library.conf;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -31,6 +33,8 @@ import java.util.Map;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.junit.Test;
 
@@ -60,7 +64,7 @@ public class TestOrderedPartitionedKVEdgeConfig {
     }
   }
 
-  @Test
+  @Test (timeout=2000)
   public void testDefaultConfigsUsed() {
     OrderedPartitionedKVEdgeConfig.Builder builder = OrderedPartitionedKVEdgeConfig
         .newBuilder("KEY", "VALUE", "PARTITIONER");
@@ -86,7 +90,7 @@ public class TestOrderedPartitionedKVEdgeConfig {
         inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
   }
 
-  @Test
+  @Test (timeout=2000)
   public void testSpecificIOConfs() {
     // Ensures that Output and Input confs are not mixed.
     OrderedPartitionedKVEdgeConfig.Builder builder = OrderedPartitionedKVEdgeConfig
@@ -109,7 +113,7 @@ public class TestOrderedPartitionedKVEdgeConfig {
         inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "DEFAULT"));
   }
 
-  @Test
+  @Test (timeout=2000)
   public void tetCommonConf() {
 
     Configuration fromConf = new Configuration(false);
@@ -179,7 +183,7 @@ public class TestOrderedPartitionedKVEdgeConfig {
 
   }
 
-  @Test
+  @Test (timeout=2000)
   public void testSetters() {
     Map<String, String> comparatorConf = Maps.newHashMap();
     comparatorConf.put("comparator.test.key", "comparatorValue");
@@ -244,7 +248,7 @@ public class TestOrderedPartitionedKVEdgeConfig {
 
   }
 
-  @Test
+  @Test (timeout=2000)
   public void testSerialization() {
     OrderedPartitionedKVEdgeConfig.Builder builder = OrderedPartitionedKVEdgeConfig
         .newBuilder("KEY", "VALUE", "PARTITIONER")
@@ -294,4 +298,35 @@ public class TestOrderedPartitionedKVEdgeConfig {
         inputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS,
             false));
   }
+
+  private void checkHistoryText(String historyText) {
+    assertNotNull(historyText);
+    assertTrue(historyText.contains(
+        TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT));
+  }
+
+  @Test (timeout=2000)
+  public void testHistoryText() {
+    OrderedPartitionedKVEdgeConfig.Builder builder =
+        OrderedPartitionedKVEdgeConfig.newBuilder("KEY", "VALUE", "PARTITIONER");
+    Configuration fromConf = new Configuration(false);
+    fromConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+        true);
+    builder.setFromConfiguration(fromConf);
+
+    OrderedPartitionedKVEdgeConfig kvEdgeConfig = builder.build();
+    checkHistoryText(kvEdgeConfig.getInputHistoryText());
+    checkHistoryText(kvEdgeConfig.getOutputHistoryText());
+
+    EdgeProperty defaultEdgeProperty = builder.build().createDefaultEdgeProperty();
+    checkHistoryText(defaultEdgeProperty.getEdgeDestination().getHistoryText());
+    checkHistoryText(defaultEdgeProperty.getEdgeSource().getHistoryText());
+
+    EdgeManagerPluginDescriptor descriptor = mock(EdgeManagerPluginDescriptor.class);
+    EdgeProperty edgeProperty = builder.build().createDefaultCustomEdgeProperty(descriptor);
+    checkHistoryText(edgeProperty.getEdgeDestination().getHistoryText());
+    checkHistoryText(edgeProperty.getEdgeSource().getHistoryText());
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedKVEdgeConfig.java
index fab1e94..0ef1cc5 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedKVEdgeConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedKVEdgeConfig.java
@@ -21,21 +21,25 @@
 package org.apache.tez.runtime.library.conf;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.junit.Test;
 
 public class TestUnorderedKVEdgeConfig {
 
-  @Test
+  @Test (timeout=2000)
   public void testNullParams() {
     try {
       UnorderedKVEdgeConfig.newBuilder(null, "VALUE");
@@ -52,7 +56,7 @@ public class TestUnorderedKVEdgeConfig {
     }
   }
 
-  @Test
+  @Test (timeout=2000)
   public void testDefaultConfigsUsed() {
     UnorderedKVEdgeConfig.Builder builder =
         UnorderedKVEdgeConfig.newBuilder("KEY", "VALUE");
@@ -85,7 +89,7 @@ public class TestUnorderedKVEdgeConfig {
         ("SerClass2,SerClass1"));
   }
 
-  @Test
+  @Test (timeout=2000)
   public void testSpecificIOConfs() {
     // Ensures that Output and Input confs are not mixed.
     UnorderedKVEdgeConfig.Builder builder =
@@ -109,7 +113,7 @@ public class TestUnorderedKVEdgeConfig {
         inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "DEFAULT"));
   }
 
-  @Test
+  @Test (timeout=2000)
   public void tetCommonConf() {
 
     Configuration fromConf = new Configuration(false);
@@ -170,4 +174,40 @@ public class TestUnorderedKVEdgeConfig {
     assertEquals("fs", inputConf.get("fs.shouldExist"));
 
   }
+
+  private void checkHistoryText(String historyText) {
+    assertNotNull(historyText);
+    assertTrue(historyText.contains(
+        TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT));
+  }
+
+  @Test (timeout=2000)
+  public void testHistoryText() {
+    UnorderedKVEdgeConfig.Builder builder = UnorderedKVEdgeConfig.newBuilder("KEY", "VALUE");
+    Configuration fromConf = new Configuration(false);
+    fromConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+        true);
+    builder.setFromConfiguration(fromConf);
+
+    UnorderedKVEdgeConfig kvEdgeConfig = builder.build();
+
+    checkHistoryText(kvEdgeConfig.getInputHistoryText());
+    checkHistoryText(kvEdgeConfig.getOutputHistoryText());
+
+    EdgeProperty defaultEdgeProperty = builder.build().createDefaultBroadcastEdgeProperty();
+    checkHistoryText(defaultEdgeProperty.getEdgeDestination().getHistoryText());
+    checkHistoryText(defaultEdgeProperty.getEdgeSource().getHistoryText());
+
+    defaultEdgeProperty = builder.build().createDefaultOneToOneEdgeProperty();
+    checkHistoryText(defaultEdgeProperty.getEdgeDestination().getHistoryText());
+    checkHistoryText(defaultEdgeProperty.getEdgeSource().getHistoryText());
+
+    EdgeManagerPluginDescriptor descriptor = mock(EdgeManagerPluginDescriptor.class);
+    EdgeProperty edgeProperty = builder.build().createDefaultCustomEdgeProperty(descriptor);
+    checkHistoryText(edgeProperty.getEdgeDestination().getHistoryText());
+    checkHistoryText(edgeProperty.getEdgeSource().getHistoryText());
+
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfig.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfig.java
index b8b86e5..fff8efd 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfig.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfig.java
@@ -21,21 +21,25 @@
 package org.apache.tez.runtime.library.conf;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.junit.Test;
 
 public class TestUnorderedPartitionedKVEdgeConfig {
 
-  @Test
+  @Test (timeout=2000)
   public void testNullParams() {
     try {
       UnorderedPartitionedKVEdgeConfig.newBuilder(null, "VALUE", "PARTITIONER");
@@ -59,7 +63,7 @@ public class TestUnorderedPartitionedKVEdgeConfig {
     }
   }
 
-  @Test
+  @Test (timeout=2000)
   public void testDefaultConfigsUsed() {
     UnorderedPartitionedKVEdgeConfig.Builder builder =
         UnorderedPartitionedKVEdgeConfig.newBuilder("KEY", "VALUE", "PARTITIONER");
@@ -92,7 +96,7 @@ public class TestUnorderedPartitionedKVEdgeConfig {
         ("SerClass2,SerClass1"));
   }
 
-  @Test
+  @Test (timeout=2000)
   public void testSpecificIOConfs() {
     // Ensures that Output and Input confs are not mixed.
     UnorderedPartitionedKVEdgeConfig.Builder builder =
@@ -116,7 +120,7 @@ public class TestUnorderedPartitionedKVEdgeConfig {
         inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "DEFAULT"));
   }
 
-  @Test
+  @Test (timeout=2000)
   public void tetCommonConf() {
 
     Configuration fromConf = new Configuration(false);
@@ -185,4 +189,36 @@ public class TestUnorderedPartitionedKVEdgeConfig {
     assertEquals("fs", inputConf.get("fs.shouldExist"));
 
   }
+
+  private void checkHistoryText(String historyText) {
+    assertNotNull(historyText);
+    assertTrue(historyText.contains(
+        TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT));
+  }
+
+  @Test (timeout=2000)
+  public void testHistoryText() {
+    UnorderedPartitionedKVEdgeConfig.Builder builder =
+        UnorderedPartitionedKVEdgeConfig.newBuilder("KEY", "VALUE", "PARTITIONER");
+    Configuration fromConf = new Configuration(false);
+    fromConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+        true);
+    builder.setFromConfiguration(fromConf);
+
+    UnorderedPartitionedKVEdgeConfig kvEdgeConfig = builder.build();
+
+    checkHistoryText(kvEdgeConfig.getInputHistoryText());
+    checkHistoryText(kvEdgeConfig.getOutputHistoryText());
+
+    EdgeProperty defaultEdgeProperty = builder.build().createDefaultEdgeProperty();
+    checkHistoryText(defaultEdgeProperty.getEdgeDestination().getHistoryText());
+    checkHistoryText(defaultEdgeProperty.getEdgeSource().getHistoryText());
+
+    EdgeManagerPluginDescriptor descriptor = mock(EdgeManagerPluginDescriptor.class);
+    EdgeProperty edgeProperty = builder.build().createDefaultCustomEdgeProperty(descriptor);
+    checkHistoryText(edgeProperty.getEdgeDestination().getHistoryText());
+    checkHistoryText(edgeProperty.getEdgeSource().getHistoryText());
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0daf2ba1/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 9ca2762..86dcc64 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -77,6 +77,7 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.FileSystemCounter;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -118,6 +119,7 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
 import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
@@ -691,8 +693,12 @@ public class TestMRRJobsDAGApi {
     Vertex stage3Vertex = Vertex.create("reduce", ProcessorDescriptor.create(
             ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
         1, Resource.newInstance(256, 1));
-    stage3Vertex.addDataSink("MROutput",
-        MROutputLegacy.createConfigBuilder(stage3Conf, NullOutputFormat.class).build());
+    stage3Conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT,
+        true);
+    DataSinkDescriptor dataSinkDescriptor =
+        MROutputLegacy.createConfigBuilder(stage3Conf, NullOutputFormat.class).build();
+    Assert.assertFalse(dataSinkDescriptor.getOutputDescriptor().getHistoryText().isEmpty());
+    stage3Vertex.addDataSink("MROutput", dataSinkDescriptor);
 
     // TODO env, resources