You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/26 06:10:38 UTC

[2/2] git commit: TEZ-503. Fixes to get BroadcastInput / BroadcastOutput working. (sseth)

TEZ-503. Fixes to get BroadcastInput / BroadcastOutput working. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3334ca14
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3334ca14
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3334ca14

Branch: refs/heads/master
Commit: 3334ca1484a1b79d5356638cdb42f139187347e9
Parents: 906be8f
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Sep 25 21:10:04 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Sep 25 21:10:04 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/tez/dag/api/DAG.java   |   5 -
 .../runtime/api/events/DataMovementEvent.java   |   5 +
 .../tez/mapreduce/examples/ExampleDriver.java   |   2 +
 .../mapreduce/examples/FilterLinesByWord.java   | 266 +++++++++++++++++++
 .../processor/FilterByWordInputProcessor.java   | 127 +++++++++
 .../processor/FilterByWordOutputProcessor.java  | 102 +++++++
 .../runtime/LogicalIOProcessorRuntimeTask.java  |   6 +-
 .../runtime/api/impl/TezInputContextImpl.java   |   6 +-
 .../runtime/api/impl/TezOutputContextImpl.java  |   6 +-
 .../api/impl/TezProcessorContextImpl.java       |   6 +-
 .../runtime/api/impl/TezTaskContextImpl.java    |   7 +-
 tez-runtime-library/pom.xml                     |   5 +
 .../broadcast/input/BroadcastInputManager.java  |  21 +-
 .../broadcast/input/BroadcastKVReader.java      |  12 +-
 .../BroadcastShuffleInputEventHandler.java      |   5 +
 .../input/BroadcastShuffleManager.java          |  81 ++++--
 .../broadcast/output/FileBasedKVWriter.java     |   4 +
 .../common/shuffle/impl/ShuffleScheduler.java   |   4 +-
 .../runtime/library/common/sort/impl/IFile.java |   5 +-
 .../library/input/ShuffledUnorderedKVInput.java |  17 +-
 .../library/output/OnFileUnorderedKVOutput.java |  11 +-
 .../runtime/library/shuffle/common/Fetcher.java |   4 +-
 .../library/shuffle/common/InputHost.java       |   7 +
 .../shuffle/common/MemoryFetchedInput.java      |   2 +-
 .../input/TestBroadcastInputManager.java        |  84 ++++++
 .../library/common/sort/impl/TestIFile.java     | 205 ++++++++++++++
 .../output/TestOnFileUnorderedKVOutput.java     | 155 +++++++++++
 .../runtime/library/testutils/KVDataGen.java    |  64 +++++
 28 files changed, 1161 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 9cb602c..f64dc18 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -171,11 +171,6 @@ public class DAG { // FIXME rename to Topology
     if(restricted){
       for(Edge e : edges){
         vertexMap.get(e.getInputVertex().getVertexName()).outDegree++;
-        if (e.getEdgeProperty().getDataMovementType() !=
-            DataMovementType.SCATTER_GATHER) {
-          throw new IllegalStateException(
-              "Unsupported connection pattern on edge. " + e);
-        }
         if (e.getEdgeProperty().getDataSourceType() !=
             DataSourceType.PERSISTED) {
           throw new IllegalStateException(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
index b384676..e5f9b05 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/DataMovementEvent.java
@@ -106,4 +106,9 @@ public final class DataMovementEvent extends Event {
     this.version = version;
   }
 
+  @Override
+  public String toString() {
+    return "DataMovementEvent [sourceIndex=" + sourceIndex + ", targetIndex="
+        + targetIndex + ", version=" + version + "]";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index 6c062a4..cb7abe1 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -74,6 +74,8 @@ public class ExampleDriver {
           "MRR Sleep Job");
       pgd.addClass("orderedwordcount", OrderedWordCount.class,
           "Word Count with words sorted on frequency");
+      pgd.addClass("filterLinesByWord", FilterLinesByWord.class,
+          "Filters lines by the specified word");
       exitCode = pgd.run(argv);
     }
     catch(Throwable e){

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
new file mode 100644
index 0000000..640d734
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ClassUtil;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.processor.FilterByWordInputProcessor;
+import org.apache.tez.processor.FilterByWordOutputProcessor;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+
+public class FilterLinesByWord {
+
+  private static Log LOG = LogFactory.getLog(FilterLinesByWord.class);
+
+  public static final String FILTER_PARAM_NAME = "tez.runtime.examples.filterbyword.word";
+
+
+  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, TezException {
+    Configuration conf = new Configuration();
+    String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+
+    if (otherArgs.length != 3) {
+      System.err.println("Usage filtelinesrbyword <in> <out> <filter_word>");
+      System.exit(2);
+    }
+
+    String inputPath = otherArgs[0];
+    String outputPath = otherArgs[1];
+    String filterWord = otherArgs[2];
+
+    FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(new Path(outputPath))) {
+      System.err.println("Output directory : " + outputPath + " already exists");
+      System.exit(2);
+    }
+
+    TezConfiguration tezConf = new TezConfiguration(conf);
+
+    fs.getWorkingDirectory();
+    Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+    TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+
+    tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, MRHelpers.getMRAMJavaOpts(tezConf));
+
+    String jarPath = ClassUtil.findContainingJar(FilterLinesByWord.class);
+    if (jarPath == null) {
+      throw new TezUncheckedException("Could not find any jar containing"
+          + FilterLinesByWord.class.getName() + " in the classpath");
+    }
+
+    Path remoteJarPath = fs.makeQualified(new Path(stagingDir, "dag_job.jar"));
+    fs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
+    FileStatus remoteJarStatus = fs.getFileStatus(remoteJarPath);
+
+    Map<String, LocalResource> commonLocalResources = new TreeMap<String, LocalResource>();
+    LocalResource dagJarLocalRsrc = LocalResource.newInstance(
+        ConverterUtils.getYarnUrlFromPath(remoteJarPath),
+        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+        remoteJarStatus.getLen(), remoteJarStatus.getModificationTime());
+    commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
+
+
+
+    AMConfiguration amConf = new AMConfiguration("default", null, commonLocalResources, tezConf, null);
+    TezSessionConfiguration sessionConf = new TezSessionConfiguration(amConf, tezConf);
+    TezSession tezSession = new TezSession("FilterLinesByWordSession", sessionConf);
+    tezSession.start(); // Why do I need to start the TezSession.
+
+    Configuration stage1Conf = new JobConf(conf);
+    stage1Conf.set(FileInputFormat.INPUT_DIR, inputPath);
+    stage1Conf.setBoolean("mapred.mapper.new-api", false);
+    stage1Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, Text.class.getName());
+    stage1Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, TextLongPair.class.getName());
+    stage1Conf.set(FILTER_PARAM_NAME, filterWord);
+
+    InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf, stagingDir);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
+
+
+
+    Configuration stage2Conf = new JobConf(conf);
+    stage2Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, Text.class.getName());
+    stage2Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, TextLongPair.class.getName());
+    stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
+    stage2Conf.setBoolean("mapred.mapper.new-api", false);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage2Conf, stage1Conf);
+
+    MRHelpers.doJobClientMagic(stage1Conf);
+    MRHelpers.doJobClientMagic(stage2Conf);
+
+    // Setup stage1 Vertex
+    Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
+        FilterByWordInputProcessor.class.getName()).setUserPayload(MRHelpers
+        .createUserPayloadFromConf(stage1Conf)), inputSplitInfo.getNumTasks(),
+        MRHelpers.getMapResource(stage1Conf));
+    stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf)).setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+    Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
+    stage1LocalResources.putAll(commonLocalResources);
+    MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo, stage1LocalResources);
+    stage1Vertex.setTaskLocalResources(stage1LocalResources);
+    Map<String, String> stage1Env = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(stage1Conf, stage1Env, true);
+    stage1Vertex.setTaskEnvironment(stage1Env);
+
+    // Setup stage2 Vertex
+    Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
+        FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
+        .createUserPayloadFromConf(stage2Conf)), 1,
+        MRHelpers.getMapResource(stage2Conf));
+    stage2Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage2Conf)).setTaskLocalResources(commonLocalResources);
+    Map<String, String> stage2Env = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(stage2Conf, stage2Env, false);
+    stage2Vertex.setTaskEnvironment(stage2Env);
+
+    DAG dag = new DAG("FilterLinesByWord");
+    Edge edge = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
+        DataMovementType.BROADCAST, DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, new OutputDescriptor(
+            OnFileUnorderedKVOutput.class.getName()), new InputDescriptor(
+            ShuffledUnorderedKVInput.class.getName())));
+    dag.addVertex(stage1Vertex).addVertex(stage2Vertex).addEdge(edge);
+
+    LOG.info("Submitting DAG to Tez Session");
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    LOG.info("Submitted DAG to Tez Session");
+
+    DAGStatus dagStatus = null;
+    try {
+      while (true) {
+        dagStatus = dagClient.getDAGStatus();
+        if(dagStatus.getState() == DAGStatus.State.RUNNING ||
+            dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+            dagStatus.getState() == DAGStatus.State.FAILED ||
+            dagStatus.getState() == DAGStatus.State.KILLED ||
+            dagStatus.getState() == DAGStatus.State.ERROR) {
+          break;
+        }
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          // continue;
+        }
+      }
+
+      while (dagStatus.getState() == DAGStatus.State.RUNNING) {
+        try {
+          ExampleDriver.printMRRDAGStatus(dagStatus);
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            // continue;
+          }
+          dagStatus = dagClient.getDAGStatus();
+        } catch (TezException e) {
+          LOG.fatal("Failed to get application progress. Exiting");
+          System.exit(-1);
+        }
+      }
+    } finally {
+      fs.delete(stagingDir, true);
+      tezSession.stop();
+    }
+
+    ExampleDriver.printMRRDAGStatus(dagStatus);
+    LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
+    System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
+  }
+  
+  public static class TextLongPair implements Writable {
+
+    private Text text;
+    private LongWritable longWritable;
+    
+    public TextLongPair() {
+    }
+    
+    public TextLongPair(Text text, LongWritable longWritable) {
+      this.text = text;
+      this.longWritable = longWritable;
+    }
+    
+    @Override
+    public void write(DataOutput out) throws IOException {
+      this.text.write(out);
+      this.longWritable.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      this.text = new Text();
+      this.longWritable = new LongWritable();
+      text.readFields(in);
+      longWritable.readFields(in);
+    }
+    
+    @Override
+    public String toString() {
+      return text.toString() + "\t" + longWritable.get();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
new file mode 100644
index 0000000..e8e315a
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.processor;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.mapreduce.examples.FilterLinesByWord;
+import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.api.KVReader.KVRecord;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+
+public class FilterByWordInputProcessor implements LogicalIOProcessor {
+
+  private static final Log LOG = LogFactory.getLog(FilterByWordInputProcessor.class);
+
+  private String filterWord;
+
+  public FilterByWordInputProcessor() {
+  }
+
+  @Override
+  public void initialize(TezProcessorContext processorContext) throws Exception {
+    Configuration conf = TezUtils.createConfFromUserPayload(processorContext.getUserPayload());
+    filterWord = conf.get(FilterLinesByWord.FILTER_PARAM_NAME);
+    if (filterWord == null) {
+      processorContext.fatalError(null, "No filter word specified");
+    }
+  }
+
+  @Override
+  public void handleEvents(List<Event> processorEvents) {
+    throw new UnsupportedOperationException("Not expecting any events to the broadcast processor");
+
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Broadcast Processor closing. Nothing to do");
+  }
+
+  @Override
+  public void run(Map<String, LogicalInput> inputs,
+      Map<String, LogicalOutput> outputs) throws Exception {
+
+    if (inputs.size() != 1) {
+      throw new IllegalStateException("TestBroadcast processor can only work with a single input");
+    }
+
+    if (outputs.size() != 1) {
+      throw new IllegalStateException("TestBroadcast processor can only work with a single output");
+    }
+
+    LogicalInput li = inputs.values().iterator().next();
+    if (! (li instanceof MRInput)) {
+      throw new IllegalStateException("TestBroadcast processor can only work with MRInput");
+    }
+
+    LogicalOutput lo = outputs.values().iterator().next();
+    if (! (lo instanceof OnFileUnorderedKVOutput)) {
+      throw new IllegalStateException("TestBroadcast processor can only work with OnFileUnorderedKVOutput");
+    }
+
+    
+    
+    MRInput mrInput = (MRInput) li;
+    OnFileUnorderedKVOutput kvOutput = (OnFileUnorderedKVOutput) lo;
+
+    Configuration updatedConf = mrInput.getConfigUpdates();
+    String fileName = updatedConf.get(MRJobConfig.MAP_INPUT_FILE);
+    LOG.info("Processing file: " + fileName);
+    Text srcFile = new Text();
+    if (fileName == null) {
+      srcFile.set("UNKNOWN_FILENAME_IN_PROCESSOR");
+    } else {
+      srcFile.set(fileName);
+    }
+
+    KVReader kvReader = mrInput.getReader();
+    KVWriter kvWriter = kvOutput.getWriter();
+
+    while (kvReader.next()) {
+      KVRecord kvRecord = kvReader.getCurrentKV();
+      Object key = kvRecord.getKey();
+      Object val = kvRecord.getValues().iterator().next();
+
+      Text valText = (Text) val;
+      String readVal = valText.toString();
+      if (readVal.contains(filterWord)) {
+        LongWritable lineNum = (LongWritable) key;
+        TextLongPair outVal = new TextLongPair(srcFile, lineNum);
+        kvWriter.write(valText, outVal);
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
new file mode 100644
index 0000000..1cb5bad
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.processor;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.api.KVReader.KVRecord;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+
+
+public class FilterByWordOutputProcessor implements LogicalIOProcessor {
+
+  private static final Log LOG = LogFactory.getLog(MapProcessor.class);
+  private TezProcessorContext processorContext;
+
+  public FilterByWordOutputProcessor() {
+  }
+
+  @Override
+  public void initialize(TezProcessorContext processorContext) throws Exception {
+    this.processorContext = processorContext;
+  }
+
+  @Override
+  public void handleEvents(List<Event> processorEvents) {
+    throw new UnsupportedOperationException("Not expecting any events to the broadcast output processor");
+  }
+
+  @Override
+  public void close() throws Exception {
+    LOG.info("Broadcast Output Processor closing. Nothing to do");
+  }
+
+  @Override
+  public void run(Map<String, LogicalInput> inputs,
+      Map<String, LogicalOutput> outputs) throws Exception {
+
+    if (inputs.size() != 1) {
+      throw new IllegalStateException("TestBroadcast processor can only work with a single input");
+    }
+
+    if (outputs.size() != 1) {
+      throw new IllegalStateException("TestBroadcast processor can only work with a single output");
+    }
+
+    LogicalInput li = inputs.values().iterator().next();
+    if (! (li instanceof ShuffledUnorderedKVInput)) {
+      throw new IllegalStateException("TestBroadcast processor can only work with ShuffledUnorderedKVInput");
+    }
+
+    LogicalOutput lo = outputs.values().iterator().next();
+    if (! (lo instanceof MROutput)) {
+      throw new IllegalStateException("TestBroadcast processor can only work with MROutput");
+    }
+
+    ShuffledUnorderedKVInput kvInput = (ShuffledUnorderedKVInput) li;
+    MROutput mrOutput = (MROutput) lo;
+
+    KVReader kvReader = kvInput.getReader();
+    KVWriter kvWriter = mrOutput.getWriter();
+    while (kvReader.next()) {
+      KVRecord kvRecord = kvReader.getCurrentKV();
+      Object key = kvRecord.getKey();
+      Object value = kvRecord.getValues().iterator().next();
+
+      kvWriter.write(key, value);
+    }
+    if (processorContext.canCommit()) {
+      mrOutput.commit();
+    } else {
+      mrOutput.abort();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 5c8ec2b..6beec91 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -230,7 +230,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         inputSpec.getInputDescriptor().getUserPayload() == null ? taskSpec
             .getProcessorDescriptor().getUserPayload() : inputSpec
             .getInputDescriptor().getUserPayload(), this,
-        serviceConsumerMetadata);
+        serviceConsumerMetadata, System.getenv());
     return inputContext;
   }
 
@@ -242,7 +242,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         outputSpec.getOutputDescriptor().getUserPayload() == null ? taskSpec
             .getProcessorDescriptor().getUserPayload() : outputSpec
             .getOutputDescriptor().getUserPayload(), this,
-        serviceConsumerMetadata);
+        serviceConsumerMetadata, System.getenv());
     return outputContext;
   }
 
@@ -250,7 +250,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
         appAttemptNumber, tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
         tezCounters, processorDescriptor.getUserPayload(), this,
-        serviceConsumerMetadata);
+        serviceConsumerMetadata, System.getenv());
     return processorContext;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 9169895..f9d1b1a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -44,9 +44,11 @@ public class TezInputContextImpl extends TezTaskContextImpl
       TezUmbilical tezUmbilical, String taskVertexName,
       String sourceVertexName, TezTaskAttemptID taskAttemptID,
       TezCounters counters, byte[] userPayload,
-      RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata) {
+      RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata,
+      Map<String, String> auxServiceEnv) {
     super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
-        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
+        auxServiceEnv);
     this.userPayload = userPayload;
     this.sourceVertexName = sourceVertexName;
     this.sourceInfo = new EventMetaData(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index fd4c3a3..e97f1db 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -45,9 +45,11 @@ public class TezOutputContextImpl extends TezTaskContextImpl
       String destinationVertexName,
       TezTaskAttemptID taskAttemptID, TezCounters counters,
       byte[] userPayload, RuntimeTask runtimeTask,
-      Map<String, ByteBuffer> serviceConsumerMetadata) {
+      Map<String, ByteBuffer> serviceConsumerMetadata,
+      Map<String, String> auxServiceEnv) {
     super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
-        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
+        auxServiceEnv);
     this.userPayload = userPayload;
     this.destinationVertexName = destinationVertexName;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index e73baf4..752376f 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -42,9 +42,11 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
       TezUmbilical tezUmbilical, String vertexName,
       TezTaskAttemptID taskAttemptID, TezCounters counters,
       byte[] userPayload, RuntimeTask runtimeTask,
-      Map<String, ByteBuffer> serviceConsumerMetadata) {
+      Map<String, ByteBuffer> serviceConsumerMetadata,
+      Map<String, String> auxServiceEnv) {
     super(conf, appAttemptNumber, vertexName, taskAttemptID,
-        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
+        auxServiceEnv);
     this.userPayload = userPayload;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
         taskVertexName, "", taskAttemptID);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index ee9e96d..b3dae4e 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -45,12 +45,14 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
   protected final TezUmbilical tezUmbilical;
   private final Map<String, ByteBuffer> serviceConsumerMetadata;
   private final int appAttemptNumber;
+  private final Map<String, String> auxServiceEnv;
 
   @Private
   public TezTaskContextImpl(Configuration conf, int appAttemptNumber,
       String taskVertexName, TezTaskAttemptID taskAttemptID,
       TezCounters counters, RuntimeTask runtimeTask,
-      TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata) {
+      TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata,
+      Map<String, String> auxServiceEnv) {
     this.conf = conf;
     this.taskVertexName = taskVertexName;
     this.taskAttemptID = taskAttemptID;
@@ -63,6 +65,7 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
     this.serviceConsumerMetadata = serviceConsumerMetadata;
     // TODO NEWTEZ at some point dag attempt should not map to app attempt
     this.appAttemptNumber = appAttemptNumber;
+    this.auxServiceEnv = auxServiceEnv;
   }
 
   @Override
@@ -123,7 +126,7 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
   @Override
   public ByteBuffer getServiceProviderMetaData(String serviceName) {
     return AuxiliaryServiceHelper.getServiceDataFromEnv(
-        serviceName, System.getenv());
+        serviceName, auxServiceEnv);
   }
 
   protected void signalFatalError(Throwable t, String message,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml
index dcdabe1..625ef29 100644
--- a/tez-runtime-library/pom.xml
+++ b/tez-runtime-library/pom.xml
@@ -34,6 +34,11 @@
       <artifactId>tez-api</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-internals</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
index cda52da..add7371 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
@@ -20,11 +20,12 @@ package org.apache.tez.runtime.library.broadcast.input;
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
@@ -37,6 +38,8 @@ import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
 public class BroadcastInputManager implements FetchedInputAllocator,
     FetchedInputCallback {
 
+  private static final Log LOG = LogFactory.getLog(BroadcastInputManager.class);
+  
   private final Configuration conf;
 
   private final TezTaskOutputFiles fileNameAllocator;
@@ -46,13 +49,13 @@ public class BroadcastInputManager implements FetchedInputAllocator,
   private final long memoryLimit;
   private final long maxSingleShuffleLimit;
 
-  private long usedMemory = 0;
+  private volatile long usedMemory = 0;
 
-  public BroadcastInputManager(TezInputContext inputContext, Configuration conf) {
+  public BroadcastInputManager(String uniqueIdentifier, Configuration conf) {
     this.conf = conf;
 
     this.fileNameAllocator = new TezTaskOutputFiles(conf,
-        inputContext.getUniqueIdentifier());
+        uniqueIdentifier);
     this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
 
     // Setup configuration
@@ -80,6 +83,8 @@ public class BroadcastInputManager implements FetchedInputAllocator,
     }
 
     this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
+    
+    LOG.info("BroadcastInputManager -> " + "MemoryLimit: " + this.memoryLimit + ", maxSingleMemLimit: " + this.maxSingleShuffleLimit);
   }
 
   @Override
@@ -91,12 +96,13 @@ public class BroadcastInputManager implements FetchedInputAllocator,
           localDirAllocator, fileNameAllocator);
     } else {
       this.usedMemory += size;
+      LOG.info("Used memory after allocating " + size  + " : " + usedMemory);
       return new MemoryFetchedInput(size, inputAttemptIdentifier, this);
     }
   }
 
   @Override
-  public void fetchComplete(FetchedInput fetchedInput) {
+  public synchronized void fetchComplete(FetchedInput fetchedInput) {
     switch (fetchedInput.getType()) {
     // Not tracking anything here.
     case DISK:
@@ -109,12 +115,12 @@ public class BroadcastInputManager implements FetchedInputAllocator,
   }
 
   @Override
-  public void fetchFailed(FetchedInput fetchedInput) {
+  public synchronized void fetchFailed(FetchedInput fetchedInput) {
     cleanup(fetchedInput);
   }
 
   @Override
-  public void freeResources(FetchedInput fetchedInput) {
+  public synchronized void freeResources(FetchedInput fetchedInput) {
     cleanup(fetchedInput);
   }
 
@@ -133,6 +139,7 @@ public class BroadcastInputManager implements FetchedInputAllocator,
 
   private synchronized void unreserve(long size) {
     this.usedMemory -= size;
+    LOG.info("Used memory after freeing " + size  + " : " + usedMemory);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
index 16e9645..675d90d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
@@ -62,9 +62,10 @@ public class BroadcastKVReader<K, V> implements KVReader {
   private FetchedInput currentFetchedInput;
   private IFile.Reader currentReader;
   
+  private int numRecordsRead = 0;
   
   public BroadcastKVReader(BroadcastShuffleManager shuffleManager,
-      Configuration conf) {
+      Configuration conf) throws IOException {
     this.shuffleManager = shuffleManager;
     this.conf = conf;
 
@@ -77,15 +78,17 @@ public class BroadcastKVReader<K, V> implements KVReader {
     }
 
     this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
-    this.valClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+    this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
 
     this.keyIn = new DataInputBuffer();
     this.valIn = new DataInputBuffer();
 
     SerializationFactory serializationFactory = new SerializationFactory(conf);
 
-    this.keyDeserializer = serializationFactory.getDeserializer(keyClass); 
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+    this.keyDeserializer.open(keyIn);
     this.valDeserializer = serializationFactory.getDeserializer(valClass);
+    this.valDeserializer.open(valIn);
     
     this.valueIterator = new SimpleValueIterator();
     this.valueIterable = new SimpleIterable(this.valueIterator);
@@ -104,15 +107,18 @@ public class BroadcastKVReader<K, V> implements KVReader {
   @Override  
   public boolean next() throws IOException {
     if (readNextFromCurrentReader()) {
+      numRecordsRead++;
       return true;
     } else {
       boolean nextInputExists = moveToNextInput();
       while (nextInputExists) {
         if(readNextFromCurrentReader()) {
+          numRecordsRead++;
           return true;
         }
         nextInputExists = moveToNextInput();
       }
+      LOG.info("Num Records read: " + numRecordsRead);
       return false;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
index c64379a..38f6e6c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -34,6 +34,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovem
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.TextFormat;
 
 public class BroadcastShuffleInputEventHandler {
 
@@ -72,6 +73,10 @@ public class BroadcastShuffleInputEventHandler {
     } catch (InvalidProtocolBufferException e) {
       throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
     }
+    LOG.info("Processing data moveement event with srcIndex: "
+        + dme.getSourceIndex() + ", targetIndex: " + dme.getTargetIndex()
+        + ", attemptNum: " + dme.getVersion() + ", payload: "
+        + TextFormat.shortDebugString(shufflePayload));
     if (shufflePayload.getOutputGenerated()) {
       InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion(), shufflePayload.getPathComponent());
       shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, 0);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 2a5c22f..fd47757 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -90,7 +90,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
 
   private final BlockingQueue<FetchedInput> completedInputs;
   private final Set<InputIdentifier> completedInputSet;
-  private final Set<InputIdentifier> pendingInputs;
+//  private final Set<InputIdentifier> pendingInputs;
   private final ConcurrentMap<String, InputHost> knownSrcHosts;
   private final Set<InputHost> pendingHosts;
   private final Set<InputAttemptIdentifier> obsoletedInputs;
@@ -128,9 +128,8 @@ public class BroadcastShuffleManager implements FetcherCallback {
     this.numInputs = numInputs;
     
     this.inputEventHandler = new BroadcastShuffleInputEventHandler(inputContext, this);
-    this.inputManager = new BroadcastInputManager(inputContext, conf);
+    this.inputManager = new BroadcastInputManager(inputContext.getUniqueIdentifier(), conf);
 
-    pendingInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
     completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
     completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
     knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
@@ -172,12 +171,17 @@ public class BroadcastShuffleManager implements FetcherCallback {
       codec = null;
       decompressor = null;
     }
+    LOG.info("BroadcastShuffleManager -> numInputs: " + numInputs
+        + " compressionCodec: " + (codec == null ? null : codec.getClass()
+        .getName()) + ", numFetchers: " + numFetchers);
   }
   
   public void run() {
     RunBroadcastShuffleCallable callable = new RunBroadcastShuffleCallable();
     runShuffleFuture = new FutureTask<Void>(callable);
-    new Thread(runShuffleFuture, "ShuffleRunner");
+    Thread runThread = new Thread(runShuffleFuture, "ShuffleRunner");
+    runThread.setDaemon(true);
+    runThread.start();
   }
   
   private class RunBroadcastShuffleCallable implements Callable<Void> {
@@ -186,8 +190,11 @@ public class BroadcastShuffleManager implements FetcherCallback {
     public Void call() throws Exception {
       while (numCompletedInputs.get() < numInputs) {
         if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
-          synchronized(lock) {
+          lock.lock();
+          try {
             wakeLoop.await();
+          } finally {
+            lock.unlock();
           }
           if (shuffleError != null) {
             // InputContext has already been informed of a fatal error.
@@ -195,8 +202,12 @@ public class BroadcastShuffleManager implements FetcherCallback {
             break;
           }
           
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("NumCompletedInputs: " + numCompletedInputs);
+          }
           if (numCompletedInputs.get() < numInputs) {
-            synchronized (lock) {
+            lock.lock();
+            try {
               int numFetchersToRun = Math.min(pendingHosts.size(), numFetchers - numRunningFetchers.get());
               int count = 0;
               for (Iterator<InputHost> inputHostIter = pendingHosts.iterator() ; inputHostIter.hasNext() ; ) {
@@ -204,6 +215,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
                 inputHostIter.remove();
                 if (inputHost.getNumPendingInputs() > 0) {
                   Fetcher fetcher = constructFetcherForHost(inputHost);
+                  LOG.info("Scheduling fetch for inputHost: " + inputHost);
                   numRunningFetchers.incrementAndGet();
                   ListenableFuture<FetchResult> future = fetcherExecutor
                       .submit(fetcher);
@@ -213,6 +225,8 @@ public class BroadcastShuffleManager implements FetcherCallback {
                   }
                 }
               }
+            } finally {
+              lock.unlock();
             }
           }
         }
@@ -252,7 +266,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
     // TODO NEWTEZ Maybe limit the number of inputs being given to a single
     // fetcher, especially in the case where #hosts < #fetchers
     fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
-        inputHost.clearAndGetPendingInputs());
+        pendingInputsForHost);
     return fetcherBuilder.build();
   }
   
@@ -269,9 +283,12 @@ public class BroadcastShuffleManager implements FetcherCallback {
       }
     }
     host.addKnownInput(srcAttemptIdentifier);
-    synchronized(lock) {
+    lock.lock();
+    try {
       pendingHosts.add(host);
       wakeLoop.signal();
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -279,16 +296,24 @@ public class BroadcastShuffleManager implements FetcherCallback {
       InputAttemptIdentifier srcAttemptIdentifier) {
     InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
     LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
-    if (pendingInputs.remove(inputIdentifier)) {
-      completedInputSet.add(inputIdentifier);
-      completedInputs.add(new NullFetchedInput(srcAttemptIdentifier));
-      numCompletedInputs.incrementAndGet();
+    
+    if (!completedInputSet.contains(inputIdentifier)) {
+      synchronized (completedInputSet) {
+        if (!completedInputSet.contains(inputIdentifier)) {
+          completedInputSet.add(inputIdentifier);
+          completedInputs.add(new NullFetchedInput(srcAttemptIdentifier));
+          numCompletedInputs.incrementAndGet();
+        }
+      }
     }
 
     // Awake the loop to check for termination.
-    synchronized (lock) {
+    lock.lock();
+    try {
       wakeLoop.signal();
-    } 
+    } finally {
+      lock.unlock();
+    }
   }
 
   public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
@@ -308,14 +333,16 @@ public class BroadcastShuffleManager implements FetcherCallback {
   public void fetchSucceeded(String host,
       InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes,
       long copyDuration) throws IOException {
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Complete fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
-    }
-    
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();    
+
+    LOG.info("Complete fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
+
     // Count irrespective of whether this is a copy of an already fetched input
-    synchronized(lock) {
+    lock.lock();
+    try {
       lastProgressTime = System.currentTimeMillis();
+    } finally {
+      lock.unlock();
     }
     
     boolean committed = false;
@@ -324,7 +351,6 @@ public class BroadcastShuffleManager implements FetcherCallback {
         if (!completedInputSet.contains(inputIdentifier)) {
           fetchedInput.commit();
           committed = true;
-          pendingInputs.remove(inputIdentifier);
           completedInputSet.add(inputIdentifier);
           completedInputs.add(fetchedInput);
           numCompletedInputs.incrementAndGet();
@@ -334,9 +360,12 @@ public class BroadcastShuffleManager implements FetcherCallback {
     if (!committed) {
       fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
     } else {
-      synchronized(lock) {
+      lock.lock();
+      try {
         // Signal the wakeLoop to check for termination.
         wakeLoop.signal();
+      } finally {
+        lock.unlock();
       }
     }
     // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
@@ -347,6 +376,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
       InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
     // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
     // For now, reporting immediately.
+    LOG.info("Fetch failed for src: " + srcAttemptIdentifier + ", connectFailed: " + connectFailed);
     InputReadErrorEvent readError = new InputReadErrorEvent(
         "Fetch failure while fetching from "
             + TezRuntimeUtils.getTaskAttemptIdentifier(
@@ -459,8 +489,11 @@ public class BroadcastShuffleManager implements FetcherCallback {
 
     private void doBookKeepingForFetcherComplete() {
       numRunningFetchers.decrementAndGet();
-      synchronized(lock) {
+      lock.lock();
+      try {
         wakeLoop.signal();
+      } finally {
+        lock.unlock();
       }
     }
     
@@ -482,7 +515,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
     public void onFailure(Throwable t) {
       LOG.error("Fetcher failed with error: " + t);
       shuffleError = t;
-      inputContext.fatalError(t, "Fetched failed");
+      inputContext.fatalError(t, "Fetch failed");
       doBookKeepingForFetcherComplete();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
index 8b19ce0..7d33e63 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -20,6 +20,8 @@ package org.apache.tez.runtime.library.broadcast.output;
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -40,6 +42,8 @@ import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
 
 public class FileBasedKVWriter implements KVWriter {
 
+  private static final Log LOG = LogFactory.getLog(FileBasedKVWriter.class);
+  
   public static final int INDEX_RECORD_LENGTH = 24;
 
   private final Configuration conf;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index a682a09..dcf8b6d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -64,7 +64,7 @@ class ShuffleScheduler {
   private final Map<Integer, MutableInt> finishedMaps;
   private final int numInputs;
   private int remainingMaps;
-  private Map<InputAttemptIdentifier, MapHost> mapLocations = new HashMap<InputAttemptIdentifier, MapHost>();
+  private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
   //TODO NEWTEZ Clean this and other maps at some point
   private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>(); 
   private Set<MapHost> pendingHosts = new HashSet<MapHost>();
@@ -307,7 +307,7 @@ class ShuffleScheduler {
     if (host == null) {
       host = new MapHost(partitionId, hostName, hostUrl);
       assert identifier.equals(host.getIdentifier());
-      mapLocations.put(srcAttempt, host);
+      mapLocations.put(identifier, host);
     }
     host.addKnownMap(srcAttempt);
     pathToIdentifierMap.put(srcAttempt.getPathComponent(), srcAttempt);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 4ce82d5..18583a5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -222,7 +221,7 @@ public class IFile {
                               " for " + key);
       }     
       
-      if(keyLength == previous.getLength()) {
+      if(rle && keyLength == previous.getLength()) {
         sameKey = (BufferUtils.compare(previous, buffer) == 0);       
       }
       
@@ -238,7 +237,7 @@ public class IFile {
                               valueLength + " for " + value);
       }
       
-      if(sameKey) {        
+      if(rle && sameKey) {        
         WritableUtils.writeVInt(out, RLE_MARKER);                   // Same key as previous
         WritableUtils.writeVInt(out, valueLength);                  // value length
         out.write(buffer.getData(), keyLength, buffer.getLength()); // only the value

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 42b2e00..40eff70 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -20,28 +20,36 @@ package org.apache.tez.runtime.library.input;
 
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
 import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleManager;
 
 import com.google.common.base.Preconditions;
 
 public class ShuffledUnorderedKVInput implements LogicalInput {
 
+  private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVInput.class);
+  
   private Configuration conf;
   private int numInputs = -1;
   private BroadcastShuffleManager shuffleManager;
+  @SuppressWarnings("rawtypes")
+  private BroadcastKVReader kvReader;
   
   
   
   public ShuffledUnorderedKVInput() {
   }
 
+  @SuppressWarnings("rawtypes")
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws Exception {
     Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
@@ -49,13 +57,14 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
     this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
     
     this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs);
+    this.shuffleManager.run();
+    this.kvReader = new BroadcastKVReader(shuffleManager, conf);
     return null;
   }
 
   @Override
-  public Reader getReader() throws Exception {
-    // TODO Auto-generated method stub
-    return null;
+  public KVReader getReader() throws Exception {
+    return this.kvReader;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index dd18149..93c00d3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
@@ -33,6 +34,7 @@ import org.apache.tez.runtime.library.broadcast.output.FileBasedKVWriter;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
@@ -68,8 +70,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
     DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
         .newBuilder();
 
-    String host = System.getenv(ApplicationConstants.Environment.NM_HOST
-        .toString());
+    String host = getHost();
     ByteBuffer shuffleMetadata = outputContext
         .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
     int shufflePort = ShuffleUtils
@@ -94,5 +95,11 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
     Preconditions.checkArgument(numOutputs == 1,
         "Number of outputs can only be 1 for " + this.getClass().getName());
   }
+  
+  @VisibleForTesting
+  @Private
+  String getHost() {
+    return System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
index ab7e5ba..1b5a8b2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java
@@ -28,6 +28,7 @@ import java.net.URL;
 import java.net.URLConnection;
 import java.security.GeneralSecurityException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -93,7 +94,7 @@ public class Fetcher implements Callable<FetchResult> {
   private int partition;
 
   // Maps from the pathComponents (unique per srcTaskId) to the specific taskId
-  private Map<String, InputAttemptIdentifier> pathToAttemptMap;
+  private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
   private Set<InputAttemptIdentifier> remaining;
 
   private URL url;
@@ -108,6 +109,7 @@ public class Fetcher implements Callable<FetchResult> {
     this.shuffleSecret = shuffleSecret;
     this.appId = appId;
     this.conf = conf;
+    this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
 
     this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
     

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
index 66605dd..4759a8b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
@@ -87,4 +87,11 @@ public class InputHost {
       return false;
     return true;
   }
+
+  @Override
+  public String toString() {
+    return "InputHost [host=" + host + ", port=" + port + ", inputs=" + inputs
+        + "]";
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
index f56877f..e34301e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/MemoryFetchedInput.java
@@ -73,7 +73,7 @@ public class MemoryFetchedInput extends FetchedInput {
     Preconditions.checkState(
         state == State.COMMITTED || state == State.ABORTED,
         "FetchedInput can only be freed after it is committed or aborted");
-    if (state == State.COMMITTED) {
+    if (state == State.COMMITTED) { // ABORTED would have already called cleanup
       state = State.FREED;
       this.byteStream = null;
       notifyFreedResource();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
new file mode 100644
index 0000000..e6603c4
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.broadcast.input;
+
+import static org.junit.Assert.*;
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.junit.Test;
+
+public class TestBroadcastInputManager {
+
+  private static final Log LOG = LogFactory.getLog(TestBroadcastInputManager.class);
+  
+  @Test
+  public void testInMemAllocation() throws IOException {
+    String localDirs = "/tmp/" + this.getClass().getName();
+    Configuration conf = new Configuration();
+    
+    long jvmMax = Runtime.getRuntime().maxMemory();
+    LOG.info("jvmMax: " + jvmMax);
+    
+    float bufferPercent = 0.1f;
+    conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT, bufferPercent);
+    conf.setFloat(TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 1.0f);
+    conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
+    
+    long inMemThreshold = (long) (bufferPercent * jvmMax);
+    LOG.info("InMemThreshold: " + inMemThreshold);
+    
+    BroadcastInputManager inputManager = new BroadcastInputManager(UUID.randomUUID().toString(), conf);
+    
+    long requestSize = (long) (0.4f * inMemThreshold);
+    LOG.info("RequestSize: " + requestSize);
+    
+    FetchedInput fi1 = inputManager.allocate(requestSize, new InputAttemptIdentifier(1, 1));
+    assertEquals(FetchedInput.Type.MEMORY, fi1.getType());
+    
+    
+    FetchedInput fi2 = inputManager.allocate(requestSize, new InputAttemptIdentifier(2, 1));
+    assertEquals(FetchedInput.Type.MEMORY, fi2.getType());
+    
+    
+    // Over limit by this point. Next reserve should give back a DISK allocation
+    FetchedInput fi3 = inputManager.allocate(requestSize, new InputAttemptIdentifier(3, 1));
+    assertEquals(FetchedInput.Type.DISK, fi3.getType());
+    
+    
+    // Freed one memory allocation. Next should be mem again.
+    fi1.abort();
+    fi1.free();
+    FetchedInput fi4 = inputManager.allocate(requestSize, new InputAttemptIdentifier(4, 1));
+    assertEquals(FetchedInput.Type.MEMORY, fi4.getType());
+    
+    // Freed one disk allocation. Next sould be disk again (no mem freed)
+    fi3.abort();
+    fi3.free();
+    FetchedInput fi5 = inputManager.allocate(requestSize, new InputAttemptIdentifier(4, 1));
+    assertEquals(FetchedInput.Type.DISK, fi5.getType());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
new file mode 100644
index 0000000..019fd0e
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.testutils.KVDataGen;
+import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestIFile {
+
+  private static final Log LOG = LogFactory.getLog(TestIFile.class);
+
+  private static Configuration defaultConf = new Configuration();
+  private static FileSystem localFs = null;
+  private static Path workDir = null;
+
+  static {
+    defaultConf.set("fs.defaultFS", "file:///");
+    try {
+      localFs = FileSystem.getLocal(defaultConf);
+      workDir = new Path(
+          new Path(System.getProperty("test.build.data", "/tmp")), TestIFile.class.getName())
+          .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+      LOG.info("Using workDir: " + workDir);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Before
+  @After
+  public void cleanup() throws Exception {
+    localFs.delete(workDir, true);
+  }
+
+  @Test
+  public void testRepeatedKeysInMemReaderNoRLE() throws IOException {
+    String outputFileName = "ifile.out";
+    Path outputPath = new Path(workDir, outputFileName);
+    List<KVPair> data = KVDataGen.generateTestData(true);
+    Writer writer = writeTestFile(outputPath, false, data);
+
+    FSDataInputStream inStream =  localFs.open(outputPath);
+    byte[] bytes = new byte[(int)writer.getRawLength()];
+
+    readDataToMem(inStream, bytes);
+    inStream.close();
+
+    InMemoryReader inMemReader = new InMemoryReader(null, new InputAttemptIdentifier(0, 0), bytes, 0, bytes.length);
+    readAndVerify(inMemReader, data);
+  }
+
+  @Test
+  public void testRepeatedKeysFileReaderNoRLE() throws IOException {
+    String outputFileName = "ifile.out";
+    Path outputPath = new Path(workDir, outputFileName);
+    List<KVPair> data = KVDataGen.generateTestData(true);
+    writeTestFile(outputPath, false, data);
+
+    IFile.Reader reader = new IFile.Reader(defaultConf, localFs, outputPath, null, null);
+
+    readAndVerify(reader, data);
+    reader.close();
+  }
+
+  @Ignore // TEZ-500
+  @Test
+  public void testRepeatedKeysInMemReaderRLE() throws IOException {
+    String outputFileName = "ifile.out";
+    Path outputPath = new Path(workDir, outputFileName);
+    List<KVPair> data = KVDataGen.generateTestData(true);
+    Writer writer = writeTestFile(outputPath, true, data);
+
+    FSDataInputStream inStream =  localFs.open(outputPath);
+    byte[] bytes = new byte[(int)writer.getRawLength()];
+
+    readDataToMem(inStream, bytes);
+    inStream.close();
+
+
+    InMemoryReader inMemReader = new InMemoryReader(null, new InputAttemptIdentifier(0, 0), bytes, 0, bytes.length);
+    readAndVerify(inMemReader, data);
+  }
+
+  @Ignore // TEZ-500
+  @Test
+  public void testRepeatedKeysFileReaderRLE() throws IOException {
+    String outputFileName = "ifile.out";
+    Path outputPath = new Path(workDir, outputFileName);
+    List<KVPair> data = KVDataGen.generateTestData(true);
+    writeTestFile(outputPath, true, data);
+
+    IFile.Reader reader = new IFile.Reader(defaultConf, localFs, outputPath, null, null);
+
+    readAndVerify(reader, data);
+    reader.close();
+  }
+
+  private void readDataToMem(FSDataInputStream inStream, byte[] bytes) throws IOException {
+    int toRead = bytes.length;
+    int offset = 0;
+    while (toRead > 0) {
+      int ret = inStream.read(bytes, offset, toRead);
+      if (ret < 0) {
+        throw new IOException("Premature EOF from inputStream");
+      }
+      toRead -= ret;
+      offset += ret;
+    }
+    LOG.info("Read: " + bytes.length + " bytes");
+  }
+
+  private void readAndVerify(Reader reader, List<KVPair> data)
+      throws IOException {
+    Text readKey = new Text();
+    IntWritable readValue = new IntWritable();
+    DataInputBuffer keyIn = new DataInputBuffer();
+    DataInputBuffer valIn = new DataInputBuffer();
+    Deserializer<Text> keyDeserializer;
+    Deserializer<IntWritable> valDeserializer;
+    SerializationFactory serializationFactory = new SerializationFactory(
+        defaultConf);
+    keyDeserializer = serializationFactory.getDeserializer(Text.class);
+    valDeserializer = serializationFactory.getDeserializer(IntWritable.class);
+    keyDeserializer.open(keyIn);
+    valDeserializer.open(valIn);
+
+    int numRecordsRead = 0;
+
+    while (reader.nextRawKey(keyIn)) {
+      reader.nextRawValue(valIn);
+      readKey = keyDeserializer.deserialize(readKey);
+      readValue = valDeserializer.deserialize(readValue);
+
+      KVPair expected = data.get(numRecordsRead);
+      assertEquals("Key does not match: Expected: " + expected.getKey()
+          + ", Read: " + readKey, expected.getKey(), readKey);
+      assertEquals("Value does not match: Expected: " + expected.getvalue()
+          + ", Read: " + readValue, expected.getvalue(), readValue);
+
+      numRecordsRead++;
+    }
+    assertEquals("Expected: " + data.size() + " records, but found: "
+        + numRecordsRead, data.size(), numRecordsRead);
+    LOG.info("Found: " + numRecordsRead + " records");
+  }
+
+  private Writer writeTestFile(Path outputPath, boolean useRle, List<KVPair> data)
+      throws IOException {
+
+    IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
+        Text.class, IntWritable.class, null, null);
+    writer.setRLE(useRle);
+
+    for (KVPair kvp : data) {
+      writer.append(kvp.getKey(), kvp.getvalue());
+    }
+
+    writer.close();
+
+    LOG.info("Uncompressed: " + writer.getRawLength());
+    LOG.info("CompressedSize: " + writer.getCompressedLength());
+
+    return writer;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
new file mode 100644
index 0000000..ff9afbd
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.output;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.runtime.library.testutils.KVDataGen;
+import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestOnFileUnorderedKVOutput {
+
+  private static final Log LOG = LogFactory.getLog(TestOnFileUnorderedKVOutput.class);
+
+  private static Configuration defaultConf = new Configuration();
+  private static FileSystem localFs = null;
+  private static Path workDir = null;
+
+  static {
+    defaultConf.set("fs.defaultFS", "file:///");
+    try {
+      localFs = FileSystem.getLocal(defaultConf);
+      workDir = new Path(
+          new Path(System.getProperty("test.build.data", "/tmp")), TestOnFileUnorderedKVOutput.class.getName())
+          .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+      LOG.info("Using workDir: " + workDir);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Before
+  @After
+  public void cleanup() throws Exception {
+    localFs.delete(workDir, true);
+  }
+
+  @Test
+  public void testGeneratedDataMovementEvent() throws Exception {
+
+    OnFileUnorderedKVOutput kvOutput = new OnFileUnorderedKVOutputForTest();
+
+    Configuration conf = new Configuration();
+    conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, Text.class.getName());
+    conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, IntWritable.class.getName());
+
+    conf.setStrings(TezJobConfig.LOCAL_DIRS, workDir.toString());
+
+    int appAttemptNumber = 1;
+    TezUmbilical tezUmbilical = null; // ZZZ TestUmbilical from mapreduce
+    String taskVertexName = "currentVertex";
+    String destinationVertexName = "destinationVertex";
+    TezDAGID dagID = new TezDAGID("2000", 1, 1);
+    TezVertexID vertexID = new TezVertexID(dagID, 1);
+    TezTaskID taskID = new TezTaskID(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 1);
+    TezCounters counters = new TezCounters();
+    byte[] userPayload = TezUtils.createUserPayloadFromConf(conf);
+    RuntimeTask runtimeTask = null;
+    
+    int shufflePort = 2112;
+    Map<String, String> auxEnv = new HashMap<String, String>();
+    ByteBuffer bb = ByteBuffer.allocate(4);
+    bb.putInt(shufflePort);
+    bb.position(0);
+    AuxiliaryServiceHelper.setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, bb, auxEnv);
+
+    
+    TezOutputContext outputContext = new TezOutputContextImpl(conf,
+        appAttemptNumber, tezUmbilical, taskVertexName, destinationVertexName,
+        taskAttemptID, counters, userPayload, runtimeTask,
+        null, auxEnv);
+
+    List<Event> events = null;
+
+    events = kvOutput.initialize(outputContext);
+    assertTrue(events != null && events.size() == 0);
+
+    KVWriter kvWriter = kvOutput.getWriter();
+    List<KVPair> data = KVDataGen.generateTestData(true);
+    for (KVPair kvp : data) {
+      kvWriter.write(kvp.getKey(), kvp.getvalue());
+    }
+
+    events = kvOutput.close();
+    assertTrue(events != null && events.size() == 1);
+    DataMovementEvent dmEvent = (DataMovementEvent)events.get(0);
+
+    assertEquals("Invalid source index", 0, dmEvent.getSourceIndex());
+
+    DataMovementEventPayloadProto shufflePayload = DataMovementEventPayloadProto
+        .parseFrom(dmEvent.getUserPayload());
+
+    assertTrue(shufflePayload.getOutputGenerated());
+    assertEquals(outputContext.getUniqueIdentifier(), shufflePayload.getPathComponent());
+    assertEquals(shufflePort, shufflePayload.getPort());
+    assertEquals("host", shufflePayload.getHost());
+  }
+
+  private static class OnFileUnorderedKVOutputForTest extends OnFileUnorderedKVOutput {
+    @Override
+    String getHost() {
+      return "host";
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3334ca14/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
new file mode 100644
index 0000000..90bb0b3
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.testutils;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+public class KVDataGen {
+
+  public static List<KVPair> generateTestData(boolean repeat) {
+    List<KVPair> data = new LinkedList<KVPair>();
+    int repeatCount = 0;
+    for (int i = 0; i < 5; i++) {
+      Text key = new Text("key" + i);
+      IntWritable value = new IntWritable(i + repeatCount);
+      KVPair kvp = new KVPair(key, value);
+      data.add(kvp);
+      if (repeat && i == 2) { // Repeat this key
+        repeatCount++;
+        value.set(i + repeatCount);
+        kvp = new KVPair(key, value);
+        data.add(kvp);
+      }
+    }
+    return data;
+  }
+
+  public static class KVPair {
+    private Text key;
+    private IntWritable value;
+
+    public KVPair(Text key, IntWritable value) {
+      this.key = key;
+      this.value = value;
+    }
+
+    public Text getKey() {
+      return this.key;
+    }
+
+    public IntWritable getvalue() {
+      return this.value;
+    }
+  }
+}
\ No newline at end of file