You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:17 UTC

[08/20] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
new file mode 100644
index 0000000..10699ac
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezHeartbeatResponse.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+public class TezHeartbeatResponse implements Writable {
+
+  private long lastRequestId;
+  private boolean shouldDie = false;
+  private List<TezEvent> events;
+
+  public TezHeartbeatResponse() {
+  }
+
+  public TezHeartbeatResponse(List<TezEvent> events) {
+    this.events = Collections.unmodifiableList(events);
+  }
+
+  public List<TezEvent> getEvents() {
+    return events;
+  }
+
+  public boolean shouldDie() {
+    return shouldDie;
+  }
+
+  public long getLastRequestId() {
+    return lastRequestId;
+  }
+
+  public void setEvents(List<TezEvent> events) {
+    this.events = Collections.unmodifiableList(events);
+  }
+
+  public void setLastRequestId(long lastRequestId ) {
+    this.lastRequestId = lastRequestId;
+  }
+
+  public void setShouldDie() {
+    this.shouldDie = true;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(lastRequestId);
+    out.writeBoolean(shouldDie);
+    if(events != null) {
+      out.writeBoolean(true);
+      out.writeInt(events.size());
+      for (TezEvent e : events) {
+        e.write(out);
+      }
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    lastRequestId = in.readLong();
+    shouldDie = in.readBoolean();
+    if(in.readBoolean()) {
+      int eventCount = in.readInt();
+      events = new ArrayList<TezEvent>(eventCount);
+      for (int i = 0; i < eventCount; ++i) {
+        TezEvent e = new TezEvent();
+        e.readFields(in);
+        events.add(e);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "{ "
+        + " lastRequestId=" + lastRequestId
+        + ", shouldDie=" + shouldDie
+        + ", eventCount=" + (events != null ? events.size() : 0)
+        + " }";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
new file mode 100644
index 0000000..9169895
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+
+public class TezInputContextImpl extends TezTaskContextImpl
+    implements TezInputContext {
+
+  private final byte[] userPayload;
+  private final String sourceVertexName;
+  private final EventMetaData sourceInfo;
+
+  @Private
+  public TezInputContextImpl(Configuration conf, int appAttemptNumber,
+      TezUmbilical tezUmbilical, String taskVertexName,
+      String sourceVertexName, TezTaskAttemptID taskAttemptID,
+      TezCounters counters, byte[] userPayload,
+      RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata) {
+    super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
+        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+    this.userPayload = userPayload;
+    this.sourceVertexName = sourceVertexName;
+    this.sourceInfo = new EventMetaData(
+        EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
+        taskAttemptID);
+    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
+        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+        getTaskIndex(), getTaskAttemptNumber(), sourceVertexName);
+  }
+
+  @Override
+  public void sendEvents(List<Event> events) {
+    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+    for (Event e : events) {
+      TezEvent tEvt = new TezEvent(e, sourceInfo);
+      tezEvents.add(tEvt);
+    }
+    tezUmbilical.addEvents(tezEvents);
+  }
+
+  @Override
+  public byte[] getUserPayload() {
+    return userPayload;
+  }
+
+  @Override
+  public String getSourceVertexName() {
+    return sourceVertexName;
+  }
+
+  @Override
+  public void fatalError(Throwable exception, String message) {
+    super.signalFatalError(exception, message, sourceInfo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
new file mode 100644
index 0000000..fd4c3a3
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+
+public class TezOutputContextImpl extends TezTaskContextImpl
+    implements TezOutputContext {
+
+  private final byte[] userPayload;
+  private final String destinationVertexName;
+  private final EventMetaData sourceInfo;
+
+  @Private
+  public TezOutputContextImpl(Configuration conf, int appAttemptNumber,
+      TezUmbilical tezUmbilical, String taskVertexName,
+      String destinationVertexName,
+      TezTaskAttemptID taskAttemptID, TezCounters counters,
+      byte[] userPayload, RuntimeTask runtimeTask,
+      Map<String, ByteBuffer> serviceConsumerMetadata) {
+    super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
+        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+    this.userPayload = userPayload;
+    this.destinationVertexName = destinationVertexName;
+    this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
+        taskVertexName, destinationVertexName, taskAttemptID);
+    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
+        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+        getTaskIndex(), getTaskAttemptNumber(), destinationVertexName);
+  }
+
+  @Override
+  public void sendEvents(List<Event> events) {
+    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+    for (Event e : events) {
+      TezEvent tEvt = new TezEvent(e, sourceInfo);
+      tezEvents.add(tEvt);
+    }
+    tezUmbilical.addEvents(tezEvents);
+  }
+
+  @Override
+  public byte[] getUserPayload() {
+    return userPayload;
+  }
+
+  @Override
+  public String getDestinationVertexName() {
+    return destinationVertexName;
+  }
+
+  @Override
+  public void fatalError(Throwable exception, String message) {
+    super.signalFatalError(exception, message, sourceInfo);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
new file mode 100644
index 0000000..e73baf4
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+
+public class TezProcessorContextImpl extends TezTaskContextImpl
+  implements TezProcessorContext {
+
+  private final byte[] userPayload;
+  private final EventMetaData sourceInfo;
+
+  public TezProcessorContextImpl(Configuration conf, int appAttemptNumber,
+      TezUmbilical tezUmbilical, String vertexName,
+      TezTaskAttemptID taskAttemptID, TezCounters counters,
+      byte[] userPayload, RuntimeTask runtimeTask,
+      Map<String, ByteBuffer> serviceConsumerMetadata) {
+    super(conf, appAttemptNumber, vertexName, taskAttemptID,
+        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+    this.userPayload = userPayload;
+    this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
+        taskVertexName, "", taskAttemptID);
+    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d", taskAttemptID
+        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+        getTaskIndex(), getTaskAttemptNumber());
+  }
+
+  @Override
+  public void sendEvents(List<Event> events) {
+    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+    for (Event e : events) {
+      TezEvent tEvt = new TezEvent(e, sourceInfo);
+      tezEvents.add(tEvt);
+    }
+    tezUmbilical.addEvents(tezEvents);
+  }
+
+  @Override
+  public byte[] getUserPayload() {
+    return userPayload;
+  }
+
+  @Override
+  public void setProgress(float progress) {
+    runtimeTask.setProgress(progress);
+  }
+
+  @Override
+  public void fatalError(Throwable exception, String message) {
+    super.signalFatalError(exception, message, sourceInfo);
+  }
+
+  @Override
+  public boolean canCommit() throws IOException {
+    return tezUmbilical.canCommit(this.taskAttemptID);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
new file mode 100644
index 0000000..ee9e96d
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.TezTaskContext;
+
+public abstract class TezTaskContextImpl implements TezTaskContext {
+
+  private final Configuration conf;
+  protected final String taskVertexName;
+  protected final TezTaskAttemptID taskAttemptID;
+  private final TezCounters counters;
+  private String[] workDirs;
+  protected String uniqueIdentifier;
+  protected final RuntimeTask runtimeTask;
+  protected final TezUmbilical tezUmbilical;
+  private final Map<String, ByteBuffer> serviceConsumerMetadata;
+  private final int appAttemptNumber;
+
+  @Private
+  public TezTaskContextImpl(Configuration conf, int appAttemptNumber,
+      String taskVertexName, TezTaskAttemptID taskAttemptID,
+      TezCounters counters, RuntimeTask runtimeTask,
+      TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata) {
+    this.conf = conf;
+    this.taskVertexName = taskVertexName;
+    this.taskAttemptID = taskAttemptID;
+    this.counters = counters;
+    // TODO Maybe change this to be task id specific at some point. For now
+    // Shuffle code relies on this being a path specified by YARN
+    this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS);
+    this.runtimeTask = runtimeTask;
+    this.tezUmbilical = tezUmbilical;
+    this.serviceConsumerMetadata = serviceConsumerMetadata;
+    // TODO NEWTEZ at some point dag attempt should not map to app attempt
+    this.appAttemptNumber = appAttemptNumber;
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    return taskAttemptID.getTaskID().getVertexID().getDAGId()
+        .getApplicationId();
+  }
+
+  @Override
+  public int getTaskIndex() {
+    return taskAttemptID.getTaskID().getId();
+  }
+
+  @Override
+  public int getDAGAttemptNumber() {
+    return appAttemptNumber;
+  }
+
+  @Override
+  public int getTaskAttemptNumber() {
+    return taskAttemptID.getId();
+  }
+
+  @Override
+  public String getDAGName() {
+    // TODO NEWTEZ Change to some form of the DAG name, for now using dagId as
+    // the unique identifier.
+    return taskAttemptID.getTaskID().getVertexID().getDAGId().toString();
+  }
+
+  @Override
+  public String getTaskVertexName() {
+    return taskVertexName;
+  }
+
+
+  @Override
+  public TezCounters getCounters() {
+    return counters;
+  }
+
+  @Override
+  public String[] getWorkDirs() {
+    return Arrays.copyOf(workDirs, workDirs.length);
+  }
+
+  @Override
+  public String getUniqueIdentifier() {
+    return uniqueIdentifier;
+  }
+
+  @Override
+  public ByteBuffer getServiceConsumerMetaData(String serviceName) {
+    return (ByteBuffer) serviceConsumerMetadata.get(serviceName)
+        .asReadOnlyBuffer().rewind();
+  }
+
+  @Override
+  public ByteBuffer getServiceProviderMetaData(String serviceName) {
+    return AuxiliaryServiceHelper.getServiceDataFromEnv(
+        serviceName, System.getenv());
+  }
+
+  protected void signalFatalError(Throwable t, String message,
+      EventMetaData sourceInfo) {
+    runtimeTask.setFatalError(t, message);
+    String diagnostics;
+    if (t != null && message != null) {
+      diagnostics = "exceptionThrown=" + StringUtils.stringifyException(t)
+          + ", errorMessage=" + message;
+    } else if (t == null && message == null) {
+      diagnostics = "Unknown error";
+    } else {
+      diagnostics = t != null ?
+          "exceptionThrown=" + StringUtils.stringifyException(t)
+          : " errorMessage=" + message;
+    }
+    tezUmbilical.signalFatalError(taskAttemptID, diagnostics, sourceInfo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java
new file mode 100644
index 0000000..addccda
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezUmbilical.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public interface TezUmbilical {
+
+  public void addEvents(Collection<TezEvent> events);
+
+  public void signalFatalError(TezTaskAttemptID taskAttemptID,
+      String diagnostics,
+      EventMetaData sourceInfo);
+
+  public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryImpl.java
new file mode 100644
index 0000000..a47526b
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryImpl.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.common.objectregistry;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+
+import com.google.inject.Singleton;
+
+@Singleton
+public class ObjectRegistryImpl implements ObjectRegistry {
+
+  private Map<String, Map.Entry<Object, ObjectLifeCycle>> objectCache =
+      new HashMap<String, Map.Entry<Object, ObjectLifeCycle>>();
+
+  @Override
+  public synchronized Object add(ObjectLifeCycle lifeCycle,
+      String key, Object value) {
+    Map.Entry<Object, ObjectLifeCycle> oldEntry =
+        objectCache.put(key,
+            new AbstractMap.SimpleImmutableEntry<Object, ObjectLifeCycle>(
+                value, lifeCycle));
+    return oldEntry != null ? oldEntry.getKey() : null;
+  }
+
+  @Override
+  public synchronized Object get(String key) {
+    Map.Entry<Object, ObjectLifeCycle> entry =
+        objectCache.get(key);
+    return entry != null ? entry.getKey() : null;
+  }
+
+  @Override
+  public synchronized boolean delete(String key) {
+    return (null != objectCache.remove(key));
+  }
+
+  public synchronized void clearCache(ObjectLifeCycle lifeCycle) {
+    for (Entry<String, Entry<Object, ObjectLifeCycle>> entry :
+      objectCache.entrySet()) {
+      if (entry.getValue().getValue().equals(lifeCycle)) {
+        objectCache.remove(entry.getKey());
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java
new file mode 100644
index 0000000..97ccf7c
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.common.objectregistry;
+
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.AbstractModule;
+
+public class ObjectRegistryModule extends AbstractModule {
+
+  private final ObjectRegistry objectRegistry;
+
+  public ObjectRegistryModule(ObjectRegistry objectRegistry) {
+    this.objectRegistry = objectRegistry;
+  }
+
+  @VisibleForTesting
+  public ObjectRegistryModule() {
+    objectRegistry = new ObjectRegistryImpl();
+  }
+
+  @Override
+  protected void configure() {
+    bind(ObjectRegistry.class).toInstance(this.objectRegistry);
+    requestStaticInjection(ObjectRegistryFactory.class);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/proto/Events.proto b/tez-runtime-internals/src/main/proto/Events.proto
new file mode 100644
index 0000000..558a2b3
--- /dev/null
+++ b/tez-runtime-internals/src/main/proto/Events.proto
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.runtime.internals.api.events";
+option java_outer_classname = "SystemEventProtos";
+option java_generate_equals_and_hash = true;
+
+message TaskAttemptFailedEventProto {
+  optional string diagnostics = 1;
+}
+
+message TaskAttemptCompletedEventProto {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java
new file mode 100644
index 0000000..35192e7
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.common.objectregistry;
+
+import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestObjectRegistry {
+
+  @SuppressWarnings("unused")
+  @Before
+  public void setup() {
+    Injector injector = Guice.createInjector(new ObjectRegistryModule());
+  }
+
+  @Test
+  public void testBasicCRUD() {
+    ObjectRegistry objectRegistry =
+        ObjectRegistryFactory.getObjectRegistry();
+    Assert.assertNotNull(objectRegistry);
+
+    Assert.assertNull(objectRegistry.get("foo"));
+    Assert.assertFalse(objectRegistry.delete("foo"));
+    Integer one = new Integer(1);
+    Integer two_1 = new Integer(2);
+    Integer two_2 = new Integer(3);
+    Assert.assertNull(objectRegistry.add(ObjectLifeCycle.DAG, "one", one));
+    Assert.assertEquals(one, objectRegistry.get("one"));
+    Assert.assertNull(objectRegistry.add(ObjectLifeCycle.DAG, "two", two_1));
+    Assert.assertNotNull(objectRegistry.add(ObjectLifeCycle.SESSION, "two", two_2));
+    Assert.assertNotEquals(two_1, objectRegistry.get("two"));
+    Assert.assertEquals(two_2, objectRegistry.get("two"));
+    Assert.assertTrue(objectRegistry.delete("one"));
+    Assert.assertFalse(objectRegistry.delete("one"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml
new file mode 100644
index 0000000..dcdabe1
--- /dev/null
+++ b/tez-runtime-library/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-runtime-library</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+    </dependency>
+    <dependency>
+     <groupId>com.google.protobuf</groupId>
+     <artifactId>protobuf-java</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>ShufflePayloads.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-sources/java</output>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/hadoop/io/BufferUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/hadoop/io/BufferUtils.java b/tez-runtime-library/src/main/java/org/apache/hadoop/io/BufferUtils.java
new file mode 100644
index 0000000..16f7a8f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/hadoop/io/BufferUtils.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
+@InterfaceStability.Unstable
+public class BufferUtils {
+  public static int compare(DataInputBuffer buf1, DataInputBuffer buf2) {
+    byte[] b1 = buf1.getData();
+    byte[] b2 = buf2.getData();
+    int s1 = buf1.getPosition();
+    int s2 = buf2.getPosition();
+    int l1 = buf1.getLength();
+    int l2 = buf2.getLength();
+    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+  }
+
+  public static int compare(DataOutputBuffer buf1, DataOutputBuffer buf2) {
+    byte[] b1 = buf1.getData();
+    byte[] b2 = buf2.getData();
+    int s1 = 0;
+    int s2 = 0;
+    int l1 = buf1.getLength();
+    int l2 = buf2.getLength();
+    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+  }
+
+  public static int compare(DataInputBuffer buf1, DataOutputBuffer buf2) {
+    byte[] b1 = buf1.getData();
+    byte[] b2 = buf2.getData();
+    int s1 = buf1.getPosition();    
+    int s2 = 0;
+    int l1 = buf1.getLength();
+    int l2 = buf2.getLength();
+    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+  }
+
+  public static int compare(DataOutputBuffer buf1, DataInputBuffer buf2) {
+    return compare(buf2, buf1);
+  }
+
+  public static void copy(DataInputBuffer src, DataOutputBuffer dst) 
+                              throws IOException {
+    byte[] b1 = src.getData();
+    int s1 = src.getPosition();    
+    int l1 = src.getLength();
+    dst.reset();
+    dst.write(b1, s1, l1 - s1);
+  }
+
+  public static void copy(DataOutputBuffer src, DataOutputBuffer dst) 
+                              throws IOException {
+    byte[] b1 = src.getData();
+    int s1 = 0;
+    int l1 = src.getLength();
+    dst.reset();
+    dst.write(b1, s1, l1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java b/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java
new file mode 100644
index 0000000..a372e01
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+public interface HashComparator<KEY> {
+
+  int getHashCode(KEY key);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
new file mode 100644
index 0000000..9c6b380
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.api;
+
+import java.io.IOException;
+
+import org.apache.tez.runtime.api.Reader;
+
+/**
+ * A key/value(s) pair based {@link Reader}.
+ * 
+ * Example usage
+ * <code>
+ * while (kvReader.next()) {
+ *   KVRecord kvRecord = getCurrentKV();
+ *   Object key =  kvRecord.getKey();
+ *   Iterable values = kvRecord.getValues();
+ * </code>
+ *
+ */
+public interface KVReader extends Reader {
+
+  /**
+   * Moves to the next key/values(s) pair
+   * 
+   * @return true if another key/value(s) pair exists, false if there are no more.
+   * @throws IOException
+   *           if an error occurs
+   */
+  public boolean next() throws IOException;
+
+  /**
+   * Return the current key/value(s) pair. Use moveToNext() to advance.
+   * @return
+   * @throws IOException
+   */
+  public KVRecord getCurrentKV() throws IOException;
+  
+  // TODO NEWTEZ Move this to getCurrentKey and getCurrentValue independently. Otherwise usage pattern seems to be getCurrentKV.getKey, getCurrentKV.getValue
+  
+  // TODO NEWTEZ KVRecord which does not need to return a list!
+  // TODO NEWTEZ Parameterize this
+  /**
+   * Represents a key and an associated set of values
+   *
+   */
+  public static class KVRecord {
+
+    private Object key;
+    private Iterable<Object> values;
+
+    public KVRecord(Object key, Iterable<Object> values) {
+      this.key = key;
+      this.values = values;
+    }
+
+    public Object getKey() {
+      return this.key;
+    }
+
+    public Iterable<Object> getValues() {
+      return this.values;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java
new file mode 100644
index 0000000..ff952ed
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.api;
+
+import java.io.IOException;
+
+import org.apache.tez.runtime.api.Writer;
+
+/**
+ * A key/value(s) pair based {@link Writer}
+ */
+public interface KVWriter extends Writer {
+  /**
+   * Writes a key/value pair.
+   * 
+   * @param key
+   *          the key to write
+   * @param value
+   *          the value to write
+   * @throws IOException
+   *           if an error occurs
+   */
+  public void write(Object key, Object value) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/Partitioner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/Partitioner.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/Partitioner.java
new file mode 100644
index 0000000..680c9b8
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/Partitioner.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.api;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+
+/**
+ * {@link Partitioner} is used by the TEZ framework to partition output
+ * key/value pairs.
+ * 
+ * <b>Partitioner Initialization</b></p> The Partitioner class is picked up
+ * using the TEZ_RUNTIME_PARTITIONER_CLASS attribute in {@link TezJobConfig}
+ * 
+ * TODO NEWTEZ Change construction to first check for a Constructor with a bytep[] payload
+ * 
+ * Partitioners need to provide a single argument ({@link Configuration})
+ * constructor or a 0 argument constructor. If both exist, preference is given
+ * to the single argument constructor. This is primarily for MR support.
+ * 
+ * If using the configuration constructor, TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS
+ * will be set in the configuration, to indicate the max number of expected
+ * partitions.
+ * 
+ */
+public interface Partitioner {
+  
+  /**
+   * Get partition for given key/value.
+   * @param key key
+   * @param value value
+   * @param numPartitions number of partitions
+   * @return
+   */
+  int getPartition(Object key, Object value, int numPartitions);
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
new file mode 100644
index 0000000..cda52da
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.broadcast.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInputCallback;
+import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
+
+public class BroadcastInputManager implements FetchedInputAllocator,
+    FetchedInputCallback {
+
+  private final Configuration conf;
+
+  private final TezTaskOutputFiles fileNameAllocator;
+  private final LocalDirAllocator localDirAllocator;
+
+  // Configuration parameters
+  private final long memoryLimit;
+  private final long maxSingleShuffleLimit;
+
+  private long usedMemory = 0;
+
+  public BroadcastInputManager(TezInputContext inputContext, Configuration conf) {
+    this.conf = conf;
+
+    this.fileNameAllocator = new TezTaskOutputFiles(conf,
+        inputContext.getUniqueIdentifier());
+    this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
+    // Setup configuration
+    final float maxInMemCopyUse = conf.getFloat(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+          + maxInMemCopyUse);
+    }
+
+    // Allow unit tests to fix Runtime memory
+    this.memoryLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+        Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
+
+    final float singleShuffleMemoryLimitPercent = conf.getFloat(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    if (singleShuffleMemoryLimitPercent <= 0.0f
+        || singleShuffleMemoryLimitPercent > 1.0f) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+          + singleShuffleMemoryLimitPercent);
+    }
+
+    this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
+  }
+
+  @Override
+  public synchronized FetchedInput allocate(long size,
+      InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
+    if (size > maxSingleShuffleLimit
+        || this.usedMemory + size > this.memoryLimit) {
+      return new DiskFetchedInput(size, inputAttemptIdentifier, this, conf,
+          localDirAllocator, fileNameAllocator);
+    } else {
+      this.usedMemory += size;
+      return new MemoryFetchedInput(size, inputAttemptIdentifier, this);
+    }
+  }
+
+  @Override
+  public void fetchComplete(FetchedInput fetchedInput) {
+    switch (fetchedInput.getType()) {
+    // Not tracking anything here.
+    case DISK:
+    case MEMORY:
+      break;
+    default:
+      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+          + " not expected for Broadcast fetch");
+    }
+  }
+
+  @Override
+  public void fetchFailed(FetchedInput fetchedInput) {
+    cleanup(fetchedInput);
+  }
+
+  @Override
+  public void freeResources(FetchedInput fetchedInput) {
+    cleanup(fetchedInput);
+  }
+
+  private void cleanup(FetchedInput fetchedInput) {
+    switch (fetchedInput.getType()) {
+    case DISK:
+      break;
+    case MEMORY:
+      unreserve(fetchedInput.getSize());
+      break;
+    default:
+      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+          + " not expected for Broadcast fetch");
+    }
+  }
+
+  private synchronized void unreserve(long size) {
+    this.usedMemory -= size;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
new file mode 100644
index 0000000..16e9645
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.broadcast.input;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
+
+public class BroadcastKVReader<K, V> implements KVReader {
+
+  private static final Log LOG = LogFactory.getLog(BroadcastKVReader.class);
+  
+  private final BroadcastShuffleManager shuffleManager;
+  private final Configuration conf;
+  private final CompressionCodec codec;
+  
+  private final Class<K> keyClass;
+  private final Class<V> valClass;
+  private final Deserializer<K> keyDeserializer;
+  private final Deserializer<V> valDeserializer;
+  private final DataInputBuffer keyIn;
+  private final DataInputBuffer valIn;
+
+  private final SimpleValueIterator valueIterator;
+  private final SimpleIterable valueIterable;
+  
+  private K key;
+  private V value;
+  
+  private FetchedInput currentFetchedInput;
+  private IFile.Reader currentReader;
+  
+  
+  public BroadcastKVReader(BroadcastShuffleManager shuffleManager,
+      Configuration conf) {
+    this.shuffleManager = shuffleManager;
+    this.conf = conf;
+
+    if (ConfigUtils.isIntermediateInputCompressed(this.conf)) {
+      Class<? extends CompressionCodec> codecClass = ConfigUtils
+          .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, conf);
+    } else {
+      codec = null;
+    }
+
+    this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+    this.valClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+
+    this.keyIn = new DataInputBuffer();
+    this.valIn = new DataInputBuffer();
+
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass); 
+    this.valDeserializer = serializationFactory.getDeserializer(valClass);
+    
+    this.valueIterator = new SimpleValueIterator();
+    this.valueIterable = new SimpleIterable(this.valueIterator);
+  }
+
+  // TODO NEWTEZ Maybe add an interface to check whether next will block.
+  
+  /**
+   * Moves to the next key/values(s) pair
+   * 
+   * @return true if another key/value(s) pair exists, false if there are no
+   *         more.
+   * @throws IOException
+   *           if an error occurs
+   */
+  @Override  
+  public boolean next() throws IOException {
+    if (readNextFromCurrentReader()) {
+      return true;
+    } else {
+      boolean nextInputExists = moveToNextInput();
+      while (nextInputExists) {
+        if(readNextFromCurrentReader()) {
+          return true;
+        }
+        nextInputExists = moveToNextInput();
+      }
+      return false;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public KVRecord getCurrentKV() throws IOException {
+    this.valueIterator.setValue(value);
+    return new KVRecord((Object)key, (Iterable<Object>)this.valueIterable);
+  }
+
+  /**
+   * Tries reading the next key and value from the current reader.
+   * @return true if the current reader has more records
+   * @throws IOException
+   */
+  private boolean readNextFromCurrentReader() throws IOException {
+    // Initial reader.
+    if (this.currentReader == null) {
+      return false;
+    } else {
+      boolean hasMore = this.currentReader.nextRawKey(keyIn);
+      if (hasMore) {
+        this.currentReader.nextRawValue(valIn);
+        this.key = keyDeserializer.deserialize(this.key);
+        this.value = valDeserializer.deserialize(this.value);
+        return true;
+      }
+      return false;
+    }
+  }
+  
+  /**
+   * Moves to the next available input. This method may block if the input is not ready yet.
+   * Also takes care of closing the previous input.
+   * 
+   * @return true if the next input exists, false otherwise
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean moveToNextInput() throws IOException {
+    if (currentReader != null) { // Close the current reader.
+      currentReader.close();
+      currentFetchedInput.free();
+    }
+    try {
+      currentFetchedInput = shuffleManager.getNextInput();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting for next available input", e);
+      throw new IOException(e);
+    }
+    if (currentFetchedInput == null) {
+      return false; // No more inputs
+    } else {
+      currentReader = openIFileReader(currentFetchedInput);
+      return true;
+    }
+  }
+
+  public IFile.Reader openIFileReader(FetchedInput fetchedInput)
+      throws IOException {
+    if (fetchedInput.getType() == Type.MEMORY) {
+      MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
+
+      return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
+          mfi.getBytes(), 0, (int) mfi.getSize());
+    } else {
+      return new IFile.Reader(conf, fetchedInput.getInputStream(),
+          fetchedInput.getSize(), codec, null);
+    }
+  }
+
+  
+  
+  // TODO NEWTEZ Move this into a common class. Also used in MRInput
+  private class SimpleValueIterator implements Iterator<V> {
+
+    private V value;
+
+    public void setValue(V value) {
+      this.value = value;
+    }
+
+    public boolean hasNext() {
+      return value != null;
+    }
+
+    public V next() {
+      V value = this.value;
+      this.value = null;
+      return value;
+    }
+
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private class SimpleIterable implements Iterable<V> {
+    private final Iterator<V> iterator;
+    public SimpleIterable(Iterator<V> iterator) {
+      this.iterator = iterator;
+    }
+
+    @Override
+    public Iterator<V> iterator() {
+      return iterator;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
new file mode 100644
index 0000000..c64379a
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tez.runtime.library.broadcast.input;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleInputEventHandler;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class BroadcastShuffleInputEventHandler {
+
+  private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
+  
+  private final BroadcastShuffleManager shuffleManager;
+  
+  public BroadcastShuffleInputEventHandler(TezInputContext inputContext, BroadcastShuffleManager shuffleManager) {
+    this.shuffleManager = shuffleManager;
+  }
+
+  public void handleEvents(List<Event> events) {
+    for (Event event : events) {
+      handleEvent(event);
+    }
+  }
+  
+  private void handleEvent(Event event) {
+    if (event instanceof DataMovementEvent) {
+      processDataMovementEvent((DataMovementEvent)event);
+    } else if (event instanceof InputFailedEvent) {
+      processInputFailedEvent((InputFailedEvent)event);
+    } else {
+      throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
+    }
+  }
+  
+  
+  private void processDataMovementEvent(DataMovementEvent dme) {
+    Preconditions.checkArgument(dme.getSourceIndex() == 0,
+        "Unexpected srcIndex: " + dme.getSourceIndex()
+            + " on DataMovementEvent. Can only be 0");
+    DataMovementEventPayloadProto shufflePayload;
+    try {
+      shufflePayload = DataMovementEventPayloadProto.parseFrom(dme.getUserPayload());
+    } catch (InvalidProtocolBufferException e) {
+      throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+    }
+    if (shufflePayload.getOutputGenerated()) {
+      InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion(), shufflePayload.getPathComponent());
+      shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, 0);
+    } else {
+      shuffleManager.addCompletedInputWithNoData(new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion()));
+    }
+  }
+  
+  private void processInputFailedEvent(InputFailedEvent ife) {
+    InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
+    shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
new file mode 100644
index 0000000..2a5c22f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -0,0 +1,489 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.broadcast.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.InputIdentifier;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.shuffle.server.ShuffleHandler;
+import org.apache.tez.runtime.library.shuffle.common.FetchResult;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
+import org.apache.tez.runtime.library.shuffle.common.Fetcher;
+import org.apache.tez.runtime.library.shuffle.common.FetcherCallback;
+import org.apache.tez.runtime.library.shuffle.common.InputHost;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.common.Fetcher.FetcherBuilder;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class BroadcastShuffleManager implements FetcherCallback {
+
+  private static final Log LOG = LogFactory.getLog(BroadcastShuffleManager.class);
+  
+  private TezInputContext inputContext;
+  private int numInputs;
+  private Configuration conf;
+  
+  private final BroadcastShuffleInputEventHandler inputEventHandler;
+  private final FetchedInputAllocator inputManager;
+  
+  private final ExecutorService fetcherRawExecutor;
+  private final ListeningExecutorService fetcherExecutor;
+
+  private final BlockingQueue<FetchedInput> completedInputs;
+  private final Set<InputIdentifier> completedInputSet;
+  private final Set<InputIdentifier> pendingInputs;
+  private final ConcurrentMap<String, InputHost> knownSrcHosts;
+  private final Set<InputHost> pendingHosts;
+  private final Set<InputAttemptIdentifier> obsoletedInputs;
+  
+  private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
+  
+  private final long startTime;
+  private long lastProgressTime;
+  
+  private FutureTask<Void> runShuffleFuture;
+  
+  // Required to be held when manipulating pendingHosts
+  private ReentrantLock lock = new ReentrantLock();
+  private Condition wakeLoop = lock.newCondition();
+  
+  private final int numFetchers;
+  private final AtomicInteger numRunningFetchers = new AtomicInteger(0);
+  
+  // Parameters required by Fetchers
+  private final SecretKey shuffleSecret;
+  private final int connectionTimeout;
+  private final int readTimeout;
+  private final CompressionCodec codec;
+  private final Decompressor decompressor;
+  
+  private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
+  
+  private volatile Throwable shuffleError;
+  
+  // TODO NEWTEZ Add counters.
+  
+  public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+    this.inputContext = inputContext;
+    this.conf = conf;
+    this.numInputs = numInputs;
+    
+    this.inputEventHandler = new BroadcastShuffleInputEventHandler(inputContext, this);
+    this.inputManager = new BroadcastInputManager(inputContext, conf);
+
+    pendingInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
+    completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
+    completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
+    knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
+    pendingHosts = Collections.newSetFromMap(new ConcurrentHashMap<InputHost, Boolean>());
+    obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
+    
+    int maxConfiguredFetchers = 
+        conf.getInt(
+            TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, 
+            TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
+    
+    this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
+    
+    this.fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Fetcher #%d")
+            .build());
+    this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
+    
+    this.startTime = System.currentTimeMillis();
+    this.lastProgressTime = startTime;
+    
+    this.shuffleSecret = ShuffleUtils
+        .getJobTokenSecretFromTokenBytes(inputContext
+            .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+    
+    this.connectionTimeout = conf.getInt(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT);
+    this.readTimeout = conf.getInt(
+        TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
+    
+    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+      Class<? extends CompressionCodec> codecClass = ConfigUtils
+          .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, conf);
+      decompressor = CodecPool.getDecompressor(codec);
+    } else {
+      codec = null;
+      decompressor = null;
+    }
+  }
+  
+  public void run() {
+    RunBroadcastShuffleCallable callable = new RunBroadcastShuffleCallable();
+    runShuffleFuture = new FutureTask<Void>(callable);
+    new Thread(runShuffleFuture, "ShuffleRunner");
+  }
+  
+  private class RunBroadcastShuffleCallable implements Callable<Void> {
+
+    @Override
+    public Void call() throws Exception {
+      while (numCompletedInputs.get() < numInputs) {
+        if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
+          synchronized(lock) {
+            wakeLoop.await();
+          }
+          if (shuffleError != null) {
+            // InputContext has already been informed of a fatal error.
+            // Initiate shutdown.
+            break;
+          }
+          
+          if (numCompletedInputs.get() < numInputs) {
+            synchronized (lock) {
+              int numFetchersToRun = Math.min(pendingHosts.size(), numFetchers - numRunningFetchers.get());
+              int count = 0;
+              for (Iterator<InputHost> inputHostIter = pendingHosts.iterator() ; inputHostIter.hasNext() ; ) {
+                InputHost inputHost = inputHostIter.next();
+                inputHostIter.remove();
+                if (inputHost.getNumPendingInputs() > 0) {
+                  Fetcher fetcher = constructFetcherForHost(inputHost);
+                  numRunningFetchers.incrementAndGet();
+                  ListenableFuture<FetchResult> future = fetcherExecutor
+                      .submit(fetcher);
+                  Futures.addCallback(future, fetchFutureCallback);
+                  if (++count >= numFetchersToRun) {
+                    break;
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+      // TODO NEWTEZ Maybe clean up inputs.
+      if (!fetcherExecutor.isShutdown()) {
+        fetcherExecutor.shutdownNow();
+      }
+      return null;
+    }
+  }
+  
+  private Fetcher constructFetcherForHost(InputHost inputHost) {
+    FetcherBuilder fetcherBuilder = new FetcherBuilder(
+        BroadcastShuffleManager.this, inputManager,
+        inputContext.getApplicationId(), shuffleSecret, conf);
+    fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout);
+    fetcherBuilder.setCompressionParameters(codec, decompressor);
+
+    // Remove obsolete inputs from the list being given to the fetcher. Also
+    // remove from the obsolete list.
+    List<InputAttemptIdentifier> pendingInputsForHost = inputHost
+        .clearAndGetPendingInputs();
+    for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
+        .iterator(); inputIter.hasNext();) {
+      InputAttemptIdentifier input = inputIter.next();
+      // Avoid adding attempts which have already completed.
+      if (completedInputSet.contains(input.getInputIdentifier())) {
+        inputIter.remove();
+      }
+      // Avoid adding attempts which have been marked as OBSOLETE 
+      if (obsoletedInputs.contains(input)) {
+        inputIter.remove();
+        obsoletedInputs.remove(input);
+      }
+    }
+    // TODO NEWTEZ Maybe limit the number of inputs being given to a single
+    // fetcher, especially in the case where #hosts < #fetchers
+    fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
+        inputHost.clearAndGetPendingInputs());
+    return fetcherBuilder.build();
+  }
+  
+  /////////////////// Methods for InputEventHandler
+  
+  public void addKnownInput(String hostName, int port,
+      InputAttemptIdentifier srcAttemptIdentifier, int partition) {
+    InputHost host = knownSrcHosts.get(hostName);
+    if (host == null) {
+      host = new InputHost(hostName, port, inputContext.getApplicationId());
+      InputHost old = knownSrcHosts.putIfAbsent(hostName, host);
+      if (old != null) {
+        host = old;
+      }
+    }
+    host.addKnownInput(srcAttemptIdentifier);
+    synchronized(lock) {
+      pendingHosts.add(host);
+      wakeLoop.signal();
+    }
+  }
+
+  public void addCompletedInputWithNoData(
+      InputAttemptIdentifier srcAttemptIdentifier) {
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
+    if (pendingInputs.remove(inputIdentifier)) {
+      completedInputSet.add(inputIdentifier);
+      completedInputs.add(new NullFetchedInput(srcAttemptIdentifier));
+      numCompletedInputs.incrementAndGet();
+    }
+
+    // Awake the loop to check for termination.
+    synchronized (lock) {
+      wakeLoop.signal();
+    } 
+  }
+
+  public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
+    obsoletedInputs.add(srcAttemptIdentifier);
+    // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
+  }
+  
+  
+  public void handleEvents(List<Event> events) {
+    inputEventHandler.handleEvents(events);
+  }
+
+  /////////////////// End of Methods for InputEventHandler
+  /////////////////// Methods from FetcherCallbackHandler
+  
+  @Override
+  public void fetchSucceeded(String host,
+      InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes,
+      long copyDuration) throws IOException {
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Complete fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
+    }
+    
+    // Count irrespective of whether this is a copy of an already fetched input
+    synchronized(lock) {
+      lastProgressTime = System.currentTimeMillis();
+    }
+    
+    boolean committed = false;
+    if (!completedInputSet.contains(inputIdentifier)) {
+      synchronized (completedInputSet) {
+        if (!completedInputSet.contains(inputIdentifier)) {
+          fetchedInput.commit();
+          committed = true;
+          pendingInputs.remove(inputIdentifier);
+          completedInputSet.add(inputIdentifier);
+          completedInputs.add(fetchedInput);
+          numCompletedInputs.incrementAndGet();
+        }
+      }
+    }
+    if (!committed) {
+      fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
+    } else {
+      synchronized(lock) {
+        // Signal the wakeLoop to check for termination.
+        wakeLoop.signal();
+      }
+    }
+    // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
+  }
+
+  @Override
+  public void fetchFailed(String host,
+      InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
+    // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
+    // For now, reporting immediately.
+    InputReadErrorEvent readError = new InputReadErrorEvent(
+        "Fetch failure while fetching from "
+            + TezRuntimeUtils.getTaskAttemptIdentifier(
+                inputContext.getSourceVertexName(),
+                srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
+                srcAttemptIdentifier.getAttemptNumber()),
+        srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
+        srcAttemptIdentifier.getAttemptNumber());
+    
+    List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
+    failedEvents.add(readError);
+    inputContext.sendEvents(failedEvents);
+  }
+  /////////////////// End of Methods from FetcherCallbackHandler
+
+  public void shutdown() throws InterruptedException {
+    if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
+      this.fetcherExecutor.shutdown();
+      this.fetcherExecutor.awaitTermination(2000l, TimeUnit.MILLISECONDS);
+      if (!this.fetcherExecutor.isShutdown()) {
+        this.fetcherExecutor.shutdownNow();
+      }
+    }
+  }
+  
+  /////////////////// Methods for walking the available inputs
+  
+  /**
+   * @return true if there is another input ready for consumption.
+   */
+  public boolean newInputAvailable() {
+    FetchedInput head = completedInputs.peek();
+    if (head == null || head instanceof NullFetchedInput) {
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * @return true if all of the required inputs have been fetched.
+   */
+  public boolean allInputsFetched() {
+    return numCompletedInputs.get() == numInputs;
+  }
+
+  /**
+   * @return the next available input, or null if there are no available inputs.
+   *         This method will block if there are currently no available inputs,
+   *         but more may become available.
+   */
+  public FetchedInput getNextInput() throws InterruptedException {
+    FetchedInput input = null;
+    do {
+      input = completedInputs.peek();
+      if (input == null) {
+        if (allInputsFetched()) {
+          break;
+        } else {
+          input = completedInputs.take(); // block
+        }
+      } else {
+        input = completedInputs.poll();
+      }
+    } while (input instanceof NullFetchedInput);
+    return input;
+  }
+
+  /////////////////// End of methods for walking the available inputs
+  
+  
+  /**
+   * Fake input that is added to the completed input list in case an input does not have any data.
+   *
+   */
+  private class NullFetchedInput extends FetchedInput {
+
+    public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
+      super(Type.MEMORY, -1, inputAttemptIdentifier, null);
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void commit() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void abort() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void free() {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+  }
+  
+  
+  private class FetchFutureCallback implements FutureCallback<FetchResult> {
+
+    private void doBookKeepingForFetcherComplete() {
+      numRunningFetchers.decrementAndGet();
+      synchronized(lock) {
+        wakeLoop.signal();
+      }
+    }
+    
+    @Override
+    public void onSuccess(FetchResult result) {
+      Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
+      if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
+        InputHost inputHost = knownSrcHosts.get(result.getHost());
+        assert inputHost != null;
+        for (InputAttemptIdentifier input : pendingInputs) {
+          inputHost.addKnownInput(input);
+        }
+        pendingHosts.add(inputHost);
+      }
+      doBookKeepingForFetcherComplete();
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      LOG.error("Fetcher failed with error: " + t);
+      shuffleError = t;
+      inputContext.fatalError(t, "Fetched failed");
+      doBookKeepingForFetcherComplete();
+    }
+  }
+}