You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:26 UTC

[17/20] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java
deleted file mode 100644
index dc1a447..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-
-public class TezHeartbeatRequest implements Writable {
-
-  private String containerIdentifier;
-  private List<TezEvent> events;
-  private TezTaskAttemptID currentTaskAttemptID;
-  private int startIndex;
-  private int maxEvents;
-  private long requestId;
-
-  public TezHeartbeatRequest() {
-  }
-
-  public TezHeartbeatRequest(long requestId, List<TezEvent> events,
-      String containerIdentifier, TezTaskAttemptID taskAttemptID,
-      int startIndex, int maxEvents) {
-    this.containerIdentifier = containerIdentifier;
-    this.requestId = requestId;
-    this.events = Collections.unmodifiableList(events);
-    this.startIndex = startIndex;
-    this.maxEvents = maxEvents;
-    this.currentTaskAttemptID = taskAttemptID;
-  }
-
-  public String getContainerIdentifier() {
-    return containerIdentifier;
-  }
-
-  public List<TezEvent> getEvents() {
-    return events;
-  }
-
-  public int getStartIndex() {
-    return startIndex;
-  }
-
-  public int getMaxEvents() {
-    return maxEvents;
-  }
-
-  public long getRequestId() {
-    return requestId;
-  }
-
-  public TezTaskAttemptID getCurrentTaskAttemptID() {
-    return currentTaskAttemptID;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    if (events != null) {
-      out.writeBoolean(true);
-      out.writeInt(events.size());
-      for (TezEvent e : events) {
-        e.write(out);
-      }
-    } else {
-      out.writeBoolean(false);
-    }
-    if (currentTaskAttemptID != null) {
-      out.writeBoolean(true);
-      currentTaskAttemptID.write(out);
-    } else {
-      out.writeBoolean(false);
-    }
-    out.writeInt(startIndex);
-    out.writeInt(maxEvents);
-    out.writeLong(requestId);
-    Text.writeString(out, containerIdentifier);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    if (in.readBoolean()) {
-      int eventsCount = in.readInt();
-      events = new ArrayList<TezEvent>(eventsCount);
-      for (int i = 0; i < eventsCount; ++i) {
-        TezEvent e = new TezEvent();
-        e.readFields(in);
-        events.add(e);
-      }
-    }
-    if (in.readBoolean()) {
-      currentTaskAttemptID = new TezTaskAttemptID();
-      currentTaskAttemptID.readFields(in);
-    } else {
-      currentTaskAttemptID = null;
-    }
-    startIndex = in.readInt();
-    maxEvents = in.readInt();
-    requestId = in.readLong();
-    containerIdentifier = Text.readString(in);
-  }
-
-  @Override
-  public String toString() {
-    return "{ "
-        + " containerId=" + containerIdentifier
-        + ", requestId=" + requestId
-        + ", startIndex=" + startIndex
-        + ", maxEventsToGet=" + maxEvents
-        + ", taskAttemptId" + currentTaskAttemptID
-        + ", eventCount=" + (events != null ? events.size() : 0)
-        + " }";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java
deleted file mode 100644
index 22ae7eb..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-
-public class TezHeartbeatResponse implements Writable {
-
-  private long lastRequestId;
-  private boolean shouldDie = false;
-  private List<TezEvent> events;
-
-  public TezHeartbeatResponse() {
-  }
-
-  public TezHeartbeatResponse(List<TezEvent> events) {
-    this.events = Collections.unmodifiableList(events);
-  }
-
-  public List<TezEvent> getEvents() {
-    return events;
-  }
-
-  public boolean shouldDie() {
-    return shouldDie;
-  }
-
-  public long getLastRequestId() {
-    return lastRequestId;
-  }
-
-  public void setEvents(List<TezEvent> events) {
-    this.events = Collections.unmodifiableList(events);
-  }
-
-  public void setLastRequestId(long lastRequestId ) {
-    this.lastRequestId = lastRequestId;
-  }
-
-  public void setShouldDie() {
-    this.shouldDie = true;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(lastRequestId);
-    out.writeBoolean(shouldDie);
-    if(events != null) {
-      out.writeBoolean(true);
-      out.writeInt(events.size());
-      for (TezEvent e : events) {
-        e.write(out);
-      }
-    } else {
-      out.writeBoolean(false);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    lastRequestId = in.readLong();
-    shouldDie = in.readBoolean();
-    if(in.readBoolean()) {
-      int eventCount = in.readInt();
-      events = new ArrayList<TezEvent>(eventCount);
-      for (int i = 0; i < eventCount; ++i) {
-        TezEvent e = new TezEvent();
-        e.readFields(in);
-        events.add(e);
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "{ "
-        + " lastRequestId=" + lastRequestId
-        + ", shouldDie=" + shouldDie
-        + ", eventCount=" + (events != null ? events.size() : 0)
-        + " }";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java
deleted file mode 100644
index 245cd3b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public class TezInputContextImpl extends TezTaskContextImpl
-    implements TezInputContext {
-
-  private final byte[] userPayload;
-  private final String sourceVertexName;
-  private final EventMetaData sourceInfo;
-
-  @Private
-  public TezInputContextImpl(Configuration conf, int appAttemptNumber,
-      TezUmbilical tezUmbilical, String taskVertexName,
-      String sourceVertexName, TezTaskAttemptID taskAttemptID,
-      TezCounters counters, byte[] userPayload,
-      RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata) {
-    super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
-        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
-    this.userPayload = userPayload;
-    this.sourceVertexName = sourceVertexName;
-    this.sourceInfo = new EventMetaData(
-        EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
-        taskAttemptID);
-    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
-        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
-        getTaskIndex(), getTaskAttemptNumber(), sourceVertexName);
-  }
-
-  @Override
-  public void sendEvents(List<Event> events) {
-    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
-    for (Event e : events) {
-      TezEvent tEvt = new TezEvent(e, sourceInfo);
-      tezEvents.add(tEvt);
-    }
-    tezUmbilical.addEvents(tezEvents);
-  }
-
-  @Override
-  public byte[] getUserPayload() {
-    return userPayload;
-  }
-
-  @Override
-  public String getSourceVertexName() {
-    return sourceVertexName;
-  }
-
-  @Override
-  public void fatalError(Throwable exception, String message) {
-    super.signalFatalError(exception, message, sourceInfo);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java
deleted file mode 100644
index 6b42e13..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public class TezOutputContextImpl extends TezTaskContextImpl
-    implements TezOutputContext {
-
-  private final byte[] userPayload;
-  private final String destinationVertexName;
-  private final EventMetaData sourceInfo;
-
-  @Private
-  public TezOutputContextImpl(Configuration conf, int appAttemptNumber,
-      TezUmbilical tezUmbilical, String taskVertexName,
-      String destinationVertexName,
-      TezTaskAttemptID taskAttemptID, TezCounters counters,
-      byte[] userPayload, RuntimeTask runtimeTask,
-      Map<String, ByteBuffer> serviceConsumerMetadata) {
-    super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
-        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
-    this.userPayload = userPayload;
-    this.destinationVertexName = destinationVertexName;
-    this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
-        taskVertexName, destinationVertexName, taskAttemptID);
-    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
-        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
-        getTaskIndex(), getTaskAttemptNumber(), destinationVertexName);
-  }
-
-  @Override
-  public void sendEvents(List<Event> events) {
-    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
-    for (Event e : events) {
-      TezEvent tEvt = new TezEvent(e, sourceInfo);
-      tezEvents.add(tEvt);
-    }
-    tezUmbilical.addEvents(tezEvents);
-  }
-
-  @Override
-  public byte[] getUserPayload() {
-    return userPayload;
-  }
-
-  @Override
-  public String getDestinationVertexName() {
-    return destinationVertexName;
-  }
-
-  @Override
-  public void fatalError(Throwable exception, String message) {
-    super.signalFatalError(exception, message, sourceInfo);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java
deleted file mode 100644
index 7ffcfd6..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.nio.ByteBuffer;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezProcessorContext;
-import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public class TezProcessorContextImpl extends TezTaskContextImpl
-  implements TezProcessorContext {
-
-  private final byte[] userPayload;
-  private final EventMetaData sourceInfo;
-
-  public TezProcessorContextImpl(Configuration conf, int appAttemptNumber,
-      TezUmbilical tezUmbilical, String vertexName,
-      TezTaskAttemptID taskAttemptID, TezCounters counters,
-      byte[] userPayload, RuntimeTask runtimeTask,
-      Map<String, ByteBuffer> serviceConsumerMetadata) {
-    super(conf, appAttemptNumber, vertexName, taskAttemptID,
-        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
-    this.userPayload = userPayload;
-    this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
-        taskVertexName, "", taskAttemptID);
-    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d", taskAttemptID
-        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
-        getTaskIndex(), getTaskAttemptNumber());
-  }
-
-  @Override
-  public void sendEvents(List<Event> events) {
-    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
-    for (Event e : events) {
-      TezEvent tEvt = new TezEvent(e, sourceInfo);
-      tezEvents.add(tEvt);
-    }
-    tezUmbilical.addEvents(tezEvents);
-  }
-
-  @Override
-  public byte[] getUserPayload() {
-    return userPayload;
-  }
-
-  @Override
-  public void setProgress(float progress) {
-    runtimeTask.setProgress(progress);
-  }
-
-  @Override
-  public void fatalError(Throwable exception, String message) {
-    super.signalFatalError(exception, message, sourceInfo);
-  }
-
-  @Override
-  public boolean canCommit() throws IOException {
-    return tezUmbilical.canCommit(this.taskAttemptID);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java
deleted file mode 100644
index d5a4037..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.TezTaskContext;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public abstract class TezTaskContextImpl implements TezTaskContext {
-
-  private final Configuration conf;
-  protected final String taskVertexName;
-  protected final TezTaskAttemptID taskAttemptID;
-  private final TezCounters counters;
-  private String[] workDirs;
-  protected String uniqueIdentifier;
-  protected final RuntimeTask runtimeTask;
-  protected final TezUmbilical tezUmbilical;
-  private final Map<String, ByteBuffer> serviceConsumerMetadata;
-  private final int appAttemptNumber;
-
-  @Private
-  public TezTaskContextImpl(Configuration conf, int appAttemptNumber,
-      String taskVertexName, TezTaskAttemptID taskAttemptID,
-      TezCounters counters, RuntimeTask runtimeTask,
-      TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata) {
-    this.conf = conf;
-    this.taskVertexName = taskVertexName;
-    this.taskAttemptID = taskAttemptID;
-    this.counters = counters;
-    // TODO Maybe change this to be task id specific at some point. For now
-    // Shuffle code relies on this being a path specified by YARN
-    this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS);
-    this.runtimeTask = runtimeTask;
-    this.tezUmbilical = tezUmbilical;
-    this.serviceConsumerMetadata = serviceConsumerMetadata;
-    // TODO NEWTEZ at some point dag attempt should not map to app attempt
-    this.appAttemptNumber = appAttemptNumber;
-  }
-
-  @Override
-  public ApplicationId getApplicationId() {
-    return taskAttemptID.getTaskID().getVertexID().getDAGId()
-        .getApplicationId();
-  }
-
-  @Override
-  public int getTaskIndex() {
-    return taskAttemptID.getTaskID().getId();
-  }
-
-  @Override
-  public int getDAGAttemptNumber() {
-    return appAttemptNumber;
-  }
-
-  @Override
-  public int getTaskAttemptNumber() {
-    return taskAttemptID.getId();
-  }
-
-  @Override
-  public String getDAGName() {
-    // TODO NEWTEZ Change to some form of the DAG name, for now using dagId as
-    // the unique identifier.
-    return taskAttemptID.getTaskID().getVertexID().getDAGId().toString();
-  }
-
-  @Override
-  public String getTaskVertexName() {
-    return taskVertexName;
-  }
-
-
-  @Override
-  public TezCounters getCounters() {
-    return counters;
-  }
-
-  @Override
-  public String[] getWorkDirs() {
-    return Arrays.copyOf(workDirs, workDirs.length);
-  }
-
-  @Override
-  public String getUniqueIdentifier() {
-    return uniqueIdentifier;
-  }
-
-  @Override
-  public ByteBuffer getServiceConsumerMetaData(String serviceName) {
-    return (ByteBuffer) serviceConsumerMetadata.get(serviceName)
-        .asReadOnlyBuffer().rewind();
-  }
-
-  @Override
-  public ByteBuffer getServiceProviderMetaData(String serviceName) {
-    return AuxiliaryServiceHelper.getServiceDataFromEnv(
-        serviceName, System.getenv());
-  }
-
-  protected void signalFatalError(Throwable t, String message,
-      EventMetaData sourceInfo) {
-    runtimeTask.setFatalError(t, message);
-    String diagnostics;
-    if (t != null && message != null) {
-      diagnostics = "exceptionThrown=" + StringUtils.stringifyException(t)
-          + ", errorMessage=" + message;
-    } else if (t == null && message == null) {
-      diagnostics = "Unknown error";
-    } else {
-      diagnostics = t != null ?
-          "exceptionThrown=" + StringUtils.stringifyException(t)
-          : " errorMessage=" + message;
-    }
-    tezUmbilical.signalFatalError(taskAttemptID, diagnostics, sourceInfo);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java
deleted file mode 100644
index 925d87b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.api.impl;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public interface TezUmbilical {
-
-  public void addEvents(Collection<TezEvent> events);
-
-  public void signalFatalError(TezTaskAttemptID taskAttemptID,
-      String diagnostics,
-      EventMetaData sourceInfo);
-
-  public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
deleted file mode 100644
index 1211598..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.broadcast.input;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
-import org.apache.tez.engine.shuffle.common.DiskFetchedInput;
-import org.apache.tez.engine.shuffle.common.FetchedInput;
-import org.apache.tez.engine.shuffle.common.FetchedInputAllocator;
-import org.apache.tez.engine.shuffle.common.FetchedInputCallback;
-import org.apache.tez.engine.shuffle.common.MemoryFetchedInput;
-
-public class BroadcastInputManager implements FetchedInputAllocator,
-    FetchedInputCallback {
-
-  private final Configuration conf;
-
-  private final TezTaskOutputFiles fileNameAllocator;
-  private final LocalDirAllocator localDirAllocator;
-
-  // Configuration parameters
-  private final long memoryLimit;
-  private final long maxSingleShuffleLimit;
-
-  private long usedMemory = 0;
-
-  public BroadcastInputManager(TezInputContext inputContext, Configuration conf) {
-    this.conf = conf;
-
-    this.fileNameAllocator = new TezTaskOutputFiles(conf,
-        inputContext.getUniqueIdentifier());
-    this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
-    // Setup configuration
-    final float maxInMemCopyUse = conf.getFloat(
-        TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT,
-        TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
-    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
-      throw new IllegalArgumentException("Invalid value for "
-          + TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
-          + maxInMemCopyUse);
-    }
-
-    // Allow unit tests to fix Runtime memory
-    this.memoryLimit = (long) (conf.getLong(Constants.TEZ_ENGINE_TASK_MEMORY,
-        Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
-
-    final float singleShuffleMemoryLimitPercent = conf.getFloat(
-        TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT,
-        TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
-    if (singleShuffleMemoryLimitPercent <= 0.0f
-        || singleShuffleMemoryLimitPercent > 1.0f) {
-      throw new IllegalArgumentException("Invalid value for "
-          + TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
-          + singleShuffleMemoryLimitPercent);
-    }
-
-    this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
-  }
-
-  @Override
-  public synchronized FetchedInput allocate(long size,
-      InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
-    if (size > maxSingleShuffleLimit
-        || this.usedMemory + size > this.memoryLimit) {
-      return new DiskFetchedInput(size, inputAttemptIdentifier, this, conf,
-          localDirAllocator, fileNameAllocator);
-    } else {
-      this.usedMemory += size;
-      return new MemoryFetchedInput(size, inputAttemptIdentifier, this);
-    }
-  }
-
-  @Override
-  public void fetchComplete(FetchedInput fetchedInput) {
-    switch (fetchedInput.getType()) {
-    // Not tracking anything here.
-    case DISK:
-    case MEMORY:
-      break;
-    default:
-      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
-          + " not expected for Broadcast fetch");
-    }
-  }
-
-  @Override
-  public void fetchFailed(FetchedInput fetchedInput) {
-    cleanup(fetchedInput);
-  }
-
-  @Override
-  public void freeResources(FetchedInput fetchedInput) {
-    cleanup(fetchedInput);
-  }
-
-  private void cleanup(FetchedInput fetchedInput) {
-    switch (fetchedInput.getType()) {
-    case DISK:
-      break;
-    case MEMORY:
-      unreserve(fetchedInput.getSize());
-      break;
-    default:
-      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
-          + " not expected for Broadcast fetch");
-    }
-  }
-
-  private synchronized void unreserve(long size) {
-    this.usedMemory -= size;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
deleted file mode 100644
index 2c53e75..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.broadcast.input;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.engine.api.KVReader;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.shuffle.impl.InMemoryReader;
-import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.shuffle.common.FetchedInput;
-import org.apache.tez.engine.shuffle.common.FetchedInput.Type;
-import org.apache.tez.engine.shuffle.common.MemoryFetchedInput;
-
-public class BroadcastKVReader<K, V> implements KVReader {
-
-  private static final Log LOG = LogFactory.getLog(BroadcastKVReader.class);
-  
-  private final BroadcastShuffleManager shuffleManager;
-  private final Configuration conf;
-  private final CompressionCodec codec;
-  
-  private final Class<K> keyClass;
-  private final Class<V> valClass;
-  private final Deserializer<K> keyDeserializer;
-  private final Deserializer<V> valDeserializer;
-  private final DataInputBuffer keyIn;
-  private final DataInputBuffer valIn;
-
-  private final SimpleValueIterator valueIterator;
-  private final SimpleIterable valueIterable;
-  
-  private K key;
-  private V value;
-  
-  private FetchedInput currentFetchedInput;
-  private IFile.Reader currentReader;
-  
-  
-  public BroadcastKVReader(BroadcastShuffleManager shuffleManager,
-      Configuration conf) {
-    this.shuffleManager = shuffleManager;
-    this.conf = conf;
-
-    if (ConfigUtils.isIntermediateInputCompressed(this.conf)) {
-      Class<? extends CompressionCodec> codecClass = ConfigUtils
-          .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, conf);
-    } else {
-      codec = null;
-    }
-
-    this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
-    this.valClass = ConfigUtils.getIntermediateInputKeyClass(conf);
-
-    this.keyIn = new DataInputBuffer();
-    this.valIn = new DataInputBuffer();
-
-    SerializationFactory serializationFactory = new SerializationFactory(conf);
-
-    this.keyDeserializer = serializationFactory.getDeserializer(keyClass); 
-    this.valDeserializer = serializationFactory.getDeserializer(valClass);
-    
-    this.valueIterator = new SimpleValueIterator();
-    this.valueIterable = new SimpleIterable(this.valueIterator);
-  }
-
-  // TODO NEWTEZ Maybe add an interface to check whether next will block.
-  
-  /**
-   * Moves to the next key/values(s) pair
-   * 
-   * @return true if another key/value(s) pair exists, false if there are no
-   *         more.
-   * @throws IOException
-   *           if an error occurs
-   */
-  @Override  
-  public boolean next() throws IOException {
-    if (readNextFromCurrentReader()) {
-      return true;
-    } else {
-      boolean nextInputExists = moveToNextInput();
-      while (nextInputExists) {
-        if(readNextFromCurrentReader()) {
-          return true;
-        }
-        nextInputExists = moveToNextInput();
-      }
-      return false;
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public KVRecord getCurrentKV() throws IOException {
-    this.valueIterator.setValue(value);
-    return new KVRecord((Object)key, (Iterable<Object>)this.valueIterable);
-  }
-
-  /**
-   * Tries reading the next key and value from the current reader.
-   * @return true if the current reader has more records
-   * @throws IOException
-   */
-  private boolean readNextFromCurrentReader() throws IOException {
-    // Initial reader.
-    if (this.currentReader == null) {
-      return false;
-    } else {
-      boolean hasMore = this.currentReader.nextRawKey(keyIn);
-      if (hasMore) {
-        this.currentReader.nextRawValue(valIn);
-        this.key = keyDeserializer.deserialize(this.key);
-        this.value = valDeserializer.deserialize(this.value);
-        return true;
-      }
-      return false;
-    }
-  }
-  
-  /**
-   * Moves to the next available input. This method may block if the input is not ready yet.
-   * Also takes care of closing the previous input.
-   * 
-   * @return true if the next input exists, false otherwise
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  private boolean moveToNextInput() throws IOException {
-    if (currentReader != null) { // Close the current reader.
-      currentReader.close();
-      currentFetchedInput.free();
-    }
-    try {
-      currentFetchedInput = shuffleManager.getNextInput();
-    } catch (InterruptedException e) {
-      LOG.warn("Interrupted while waiting for next available input", e);
-      throw new IOException(e);
-    }
-    if (currentFetchedInput == null) {
-      return false; // No more inputs
-    } else {
-      currentReader = openIFileReader(currentFetchedInput);
-      return true;
-    }
-  }
-
-  public IFile.Reader openIFileReader(FetchedInput fetchedInput)
-      throws IOException {
-    if (fetchedInput.getType() == Type.MEMORY) {
-      MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
-
-      return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
-          mfi.getBytes(), 0, (int) mfi.getSize());
-    } else {
-      return new IFile.Reader(conf, fetchedInput.getInputStream(),
-          fetchedInput.getSize(), codec, null);
-    }
-  }
-
-  
-  
-  // TODO NEWTEZ Move this into a common class. Also used in MRInput
-  private class SimpleValueIterator implements Iterator<V> {
-
-    private V value;
-
-    public void setValue(V value) {
-      this.value = value;
-    }
-
-    public boolean hasNext() {
-      return value != null;
-    }
-
-    public V next() {
-      V value = this.value;
-      this.value = null;
-      return value;
-    }
-
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  private class SimpleIterable implements Iterable<V> {
-    private final Iterator<V> iterator;
-    public SimpleIterable(Iterator<V> iterator) {
-      this.iterator = iterator;
-    }
-
-    @Override
-    public Iterator<V> iterator() {
-      return iterator;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java
deleted file mode 100644
index e89e892..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.tez.engine.broadcast.input;
-
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.events.DataMovementEvent;
-import org.apache.tez.engine.api.events.InputFailedEvent;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.shuffle.impl.ShuffleInputEventHandler;
-import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-public class BroadcastShuffleInputEventHandler {
-
-  private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
-  
-  private final BroadcastShuffleManager shuffleManager;
-  
-  public BroadcastShuffleInputEventHandler(TezInputContext inputContext, BroadcastShuffleManager shuffleManager) {
-    this.shuffleManager = shuffleManager;
-  }
-
-  public void handleEvents(List<Event> events) {
-    for (Event event : events) {
-      handleEvent(event);
-    }
-  }
-  
-  private void handleEvent(Event event) {
-    if (event instanceof DataMovementEvent) {
-      processDataMovementEvent((DataMovementEvent)event);
-    } else if (event instanceof InputFailedEvent) {
-      processInputFailedEvent((InputFailedEvent)event);
-    } else {
-      throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
-    }
-  }
-  
-  
-  private void processDataMovementEvent(DataMovementEvent dme) {
-    Preconditions.checkArgument(dme.getSourceIndex() == 0,
-        "Unexpected srcIndex: " + dme.getSourceIndex()
-            + " on DataMovementEvent. Can only be 0");
-    DataMovementEventPayloadProto shufflePayload;
-    try {
-      shufflePayload = DataMovementEventPayloadProto.parseFrom(dme.getUserPayload());
-    } catch (InvalidProtocolBufferException e) {
-      throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
-    }
-    if (shufflePayload.getOutputGenerated()) {
-      InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion(), shufflePayload.getPathComponent());
-      shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, 0);
-    } else {
-      shuffleManager.addCompletedInputWithNoData(new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion()));
-    }
-  }
-  
-  private void processInputFailedEvent(InputFailedEvent ife) {
-    InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
-    shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java
deleted file mode 100644
index 7b205fa..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java
+++ /dev/null
@@ -1,489 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.broadcast.input;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.crypto.SecretKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.events.InputReadErrorEvent;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.InputIdentifier;
-import org.apache.tez.engine.common.TezEngineUtils;
-import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
-import org.apache.tez.engine.shuffle.common.FetchResult;
-import org.apache.tez.engine.shuffle.common.FetchedInput;
-import org.apache.tez.engine.shuffle.common.Fetcher;
-import org.apache.tez.engine.shuffle.common.FetcherCallback;
-import org.apache.tez.engine.shuffle.common.InputHost;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
-import org.apache.tez.engine.shuffle.common.Fetcher.FetcherBuilder;
-import org.apache.tez.engine.shuffle.common.FetchedInputAllocator;
-
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class BroadcastShuffleManager implements FetcherCallback {
-
-  private static final Log LOG = LogFactory.getLog(BroadcastShuffleManager.class);
-  
-  private TezInputContext inputContext;
-  private int numInputs;
-  private Configuration conf;
-  
-  private final BroadcastShuffleInputEventHandler inputEventHandler;
-  private final FetchedInputAllocator inputManager;
-  
-  private final ExecutorService fetcherRawExecutor;
-  private final ListeningExecutorService fetcherExecutor;
-
-  private final BlockingQueue<FetchedInput> completedInputs;
-  private final Set<InputIdentifier> completedInputSet;
-  private final Set<InputIdentifier> pendingInputs;
-  private final ConcurrentMap<String, InputHost> knownSrcHosts;
-  private final Set<InputHost> pendingHosts;
-  private final Set<InputAttemptIdentifier> obsoletedInputs;
-  
-  private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
-  
-  private final long startTime;
-  private long lastProgressTime;
-  
-  private FutureTask<Void> runShuffleFuture;
-  
-  // Required to be held when manipulating pendingHosts
-  private ReentrantLock lock = new ReentrantLock();
-  private Condition wakeLoop = lock.newCondition();
-  
-  private final int numFetchers;
-  private final AtomicInteger numRunningFetchers = new AtomicInteger(0);
-  
-  // Parameters required by Fetchers
-  private final SecretKey shuffleSecret;
-  private final int connectionTimeout;
-  private final int readTimeout;
-  private final CompressionCodec codec;
-  private final Decompressor decompressor;
-  
-  private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
-  
-  private volatile Throwable shuffleError;
-  
-  // TODO NEWTEZ Add counters.
-  
-  public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
-    this.inputContext = inputContext;
-    this.conf = conf;
-    this.numInputs = numInputs;
-    
-    this.inputEventHandler = new BroadcastShuffleInputEventHandler(inputContext, this);
-    this.inputManager = new BroadcastInputManager(inputContext, conf);
-
-    pendingInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
-    completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
-    completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
-    knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
-    pendingHosts = Collections.newSetFromMap(new ConcurrentHashMap<InputHost, Boolean>());
-    obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
-    
-    int maxConfiguredFetchers = 
-        conf.getInt(
-            TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES, 
-            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
-    
-    this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
-    
-    this.fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Fetcher #%d")
-            .build());
-    this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
-    
-    this.startTime = System.currentTimeMillis();
-    this.lastProgressTime = startTime;
-    
-    this.shuffleSecret = ShuffleUtils
-        .getJobTokenSecretFromTokenBytes(inputContext
-            .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
-    
-    this.connectionTimeout = conf.getInt(
-        TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT,
-        TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_STALLED_COPY_TIMEOUT);
-    this.readTimeout = conf.getInt(
-        TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT,
-        TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
-    
-    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
-      Class<? extends CompressionCodec> codecClass = ConfigUtils
-          .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, conf);
-      decompressor = CodecPool.getDecompressor(codec);
-    } else {
-      codec = null;
-      decompressor = null;
-    }
-  }
-  
-  public void run() {
-    RunBroadcastShuffleCallable callable = new RunBroadcastShuffleCallable();
-    runShuffleFuture = new FutureTask<Void>(callable);
-    new Thread(runShuffleFuture, "ShuffleRunner");
-  }
-  
-  private class RunBroadcastShuffleCallable implements Callable<Void> {
-
-    @Override
-    public Void call() throws Exception {
-      while (numCompletedInputs.get() < numInputs) {
-        if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
-          synchronized(lock) {
-            wakeLoop.await();
-          }
-          if (shuffleError != null) {
-            // InputContext has already been informed of a fatal error.
-            // Initiate shutdown.
-            break;
-          }
-          
-          if (numCompletedInputs.get() < numInputs) {
-            synchronized (lock) {
-              int numFetchersToRun = Math.min(pendingHosts.size(), numFetchers - numRunningFetchers.get());
-              int count = 0;
-              for (Iterator<InputHost> inputHostIter = pendingHosts.iterator() ; inputHostIter.hasNext() ; ) {
-                InputHost inputHost = inputHostIter.next();
-                inputHostIter.remove();
-                if (inputHost.getNumPendingInputs() > 0) {
-                  Fetcher fetcher = constructFetcherForHost(inputHost);
-                  numRunningFetchers.incrementAndGet();
-                  ListenableFuture<FetchResult> future = fetcherExecutor
-                      .submit(fetcher);
-                  Futures.addCallback(future, fetchFutureCallback);
-                  if (++count >= numFetchersToRun) {
-                    break;
-                  }
-                }
-              }
-            }
-          }
-        }
-      }
-      // TODO NEWTEZ Maybe clean up inputs.
-      if (!fetcherExecutor.isShutdown()) {
-        fetcherExecutor.shutdownNow();
-      }
-      return null;
-    }
-  }
-  
-  private Fetcher constructFetcherForHost(InputHost inputHost) {
-    FetcherBuilder fetcherBuilder = new FetcherBuilder(
-        BroadcastShuffleManager.this, inputManager,
-        inputContext.getApplicationId(), shuffleSecret, conf);
-    fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout);
-    fetcherBuilder.setCompressionParameters(codec, decompressor);
-
-    // Remove obsolete inputs from the list being given to the fetcher. Also
-    // remove from the obsolete list.
-    List<InputAttemptIdentifier> pendingInputsForHost = inputHost
-        .clearAndGetPendingInputs();
-    for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
-        .iterator(); inputIter.hasNext();) {
-      InputAttemptIdentifier input = inputIter.next();
-      // Avoid adding attempts which have already completed.
-      if (completedInputSet.contains(input.getInputIdentifier())) {
-        inputIter.remove();
-      }
-      // Avoid adding attempts which have been marked as OBSOLETE 
-      if (obsoletedInputs.contains(input)) {
-        inputIter.remove();
-        obsoletedInputs.remove(input);
-      }
-    }
-    // TODO NEWTEZ Maybe limit the number of inputs being given to a single
-    // fetcher, especially in the case where #hosts < #fetchers
-    fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
-        inputHost.clearAndGetPendingInputs());
-    return fetcherBuilder.build();
-  }
-  
-  /////////////////// Methods for InputEventHandler
-  
-  public void addKnownInput(String hostName, int port,
-      InputAttemptIdentifier srcAttemptIdentifier, int partition) {
-    InputHost host = knownSrcHosts.get(hostName);
-    if (host == null) {
-      host = new InputHost(hostName, port, inputContext.getApplicationId());
-      InputHost old = knownSrcHosts.putIfAbsent(hostName, host);
-      if (old != null) {
-        host = old;
-      }
-    }
-    host.addKnownInput(srcAttemptIdentifier);
-    synchronized(lock) {
-      pendingHosts.add(host);
-      wakeLoop.signal();
-    }
-  }
-
-  public void addCompletedInputWithNoData(
-      InputAttemptIdentifier srcAttemptIdentifier) {
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
-    LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
-    if (pendingInputs.remove(inputIdentifier)) {
-      completedInputSet.add(inputIdentifier);
-      completedInputs.add(new NullFetchedInput(srcAttemptIdentifier));
-      numCompletedInputs.incrementAndGet();
-    }
-
-    // Awake the loop to check for termination.
-    synchronized (lock) {
-      wakeLoop.signal();
-    } 
-  }
-
-  public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
-    obsoletedInputs.add(srcAttemptIdentifier);
-    // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
-  }
-  
-  
-  public void handleEvents(List<Event> events) {
-    inputEventHandler.handleEvents(events);
-  }
-
-  /////////////////// End of Methods for InputEventHandler
-  /////////////////// Methods from FetcherCallbackHandler
-  
-  @Override
-  public void fetchSucceeded(String host,
-      InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes,
-      long copyDuration) throws IOException {
-    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Complete fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
-    }
-    
-    // Count irrespective of whether this is a copy of an already fetched input
-    synchronized(lock) {
-      lastProgressTime = System.currentTimeMillis();
-    }
-    
-    boolean committed = false;
-    if (!completedInputSet.contains(inputIdentifier)) {
-      synchronized (completedInputSet) {
-        if (!completedInputSet.contains(inputIdentifier)) {
-          fetchedInput.commit();
-          committed = true;
-          pendingInputs.remove(inputIdentifier);
-          completedInputSet.add(inputIdentifier);
-          completedInputs.add(fetchedInput);
-          numCompletedInputs.incrementAndGet();
-        }
-      }
-    }
-    if (!committed) {
-      fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
-    } else {
-      synchronized(lock) {
-        // Signal the wakeLoop to check for termination.
-        wakeLoop.signal();
-      }
-    }
-    // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
-  }
-
-  @Override
-  public void fetchFailed(String host,
-      InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
-    // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
-    // For now, reporting immediately.
-    InputReadErrorEvent readError = new InputReadErrorEvent(
-        "Fetch failure while fetching from "
-            + TezEngineUtils.getTaskAttemptIdentifier(
-                inputContext.getSourceVertexName(),
-                srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
-                srcAttemptIdentifier.getAttemptNumber()),
-        srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
-        srcAttemptIdentifier.getAttemptNumber());
-    
-    List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
-    failedEvents.add(readError);
-    inputContext.sendEvents(failedEvents);
-  }
-  /////////////////// End of Methods from FetcherCallbackHandler
-
-  public void shutdown() throws InterruptedException {
-    if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
-      this.fetcherExecutor.shutdown();
-      this.fetcherExecutor.awaitTermination(2000l, TimeUnit.MILLISECONDS);
-      if (!this.fetcherExecutor.isShutdown()) {
-        this.fetcherExecutor.shutdownNow();
-      }
-    }
-  }
-  
-  /////////////////// Methods for walking the available inputs
-  
-  /**
-   * @return true if there is another input ready for consumption.
-   */
-  public boolean newInputAvailable() {
-    FetchedInput head = completedInputs.peek();
-    if (head == null || head instanceof NullFetchedInput) {
-      return false;
-    } else {
-      return true;
-    }
-  }
-
-  /**
-   * @return true if all of the required inputs have been fetched.
-   */
-  public boolean allInputsFetched() {
-    return numCompletedInputs.get() == numInputs;
-  }
-
-  /**
-   * @return the next available input, or null if there are no available inputs.
-   *         This method will block if there are currently no available inputs,
-   *         but more may become available.
-   */
-  public FetchedInput getNextInput() throws InterruptedException {
-    FetchedInput input = null;
-    do {
-      input = completedInputs.peek();
-      if (input == null) {
-        if (allInputsFetched()) {
-          break;
-        } else {
-          input = completedInputs.take(); // block
-        }
-      } else {
-        input = completedInputs.poll();
-      }
-    } while (input instanceof NullFetchedInput);
-    return input;
-  }
-
-  /////////////////// End of methods for walking the available inputs
-  
-  
-  /**
-   * Fake input that is added to the completed input list in case an input does not have any data.
-   *
-   */
-  private class NullFetchedInput extends FetchedInput {
-
-    public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
-      super(Type.MEMORY, -1, inputAttemptIdentifier, null);
-    }
-
-    @Override
-    public OutputStream getOutputStream() throws IOException {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-
-    @Override
-    public InputStream getInputStream() throws IOException {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-
-    @Override
-    public void commit() throws IOException {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-
-    @Override
-    public void abort() throws IOException {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-
-    @Override
-    public void free() {
-      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
-    }
-  }
-  
-  
-  private class FetchFutureCallback implements FutureCallback<FetchResult> {
-
-    private void doBookKeepingForFetcherComplete() {
-      numRunningFetchers.decrementAndGet();
-      synchronized(lock) {
-        wakeLoop.signal();
-      }
-    }
-    
-    @Override
-    public void onSuccess(FetchResult result) {
-      Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
-      if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
-        InputHost inputHost = knownSrcHosts.get(result.getHost());
-        assert inputHost != null;
-        for (InputAttemptIdentifier input : pendingInputs) {
-          inputHost.addKnownInput(input);
-        }
-        pendingHosts.add(inputHost);
-      }
-      doBookKeepingForFetcherComplete();
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-      LOG.error("Fetcher failed with error: " + t);
-      shuffleError = t;
-      inputContext.fatalError(t, "Fetched failed");
-      doBookKeepingForFetcherComplete();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
deleted file mode 100644
index 474d1cd..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.broadcast.output;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.engine.api.KVWriter;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.TezEngineUtils;
-import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.common.sort.impl.TezIndexRecord;
-import org.apache.tez.engine.common.sort.impl.TezSpillRecord;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-
-public class FileBasedKVWriter implements KVWriter {
-
-  public static final int INDEX_RECORD_LENGTH = 24;
-
-  private final Configuration conf;
-  private int numRecords = 0;
-
-  @SuppressWarnings("rawtypes")
-  private Class keyClass;
-  @SuppressWarnings("rawtypes")
-  private Class valClass;
-  private CompressionCodec codec;
-  private FileSystem rfs;
-  private IFile.Writer writer;
-
-  private TezTaskOutput ouputFileManager;
-
-  // TODO NEWTEZ Define Counters
-  // Number of records
-  // Time waiting for a write to complete, if that's possible.
-  // Size of key-value pairs written.
-
-  public FileBasedKVWriter(TezOutputContext outputContext) throws IOException {
-    this.conf = TezUtils.createConfFromUserPayload(outputContext
-        .getUserPayload());
-    this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
-        outputContext.getWorkDirs());
-
-    this.rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
-
-    // Setup serialization
-    keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
-    valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
-
-    // Setup compression
-    if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
-      Class<? extends CompressionCodec> codecClass = ConfigUtils
-          .getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, this.conf);
-    } else {
-      codec = null;
-    }
-
-    this.ouputFileManager = TezEngineUtils.instantiateTaskOutputManager(conf,
-        outputContext);
-
-    initWriter();
-  }
-
-  /**
-   * @return true if any output was generated. false otherwise
-   * @throws IOException
-   */
-  public boolean close() throws IOException {
-    this.writer.close();
-    TezIndexRecord rec = new TezIndexRecord(0, writer.getRawLength(),
-        writer.getCompressedLength());
-    TezSpillRecord sr = new TezSpillRecord(1);
-    sr.putIndex(rec, 0);
-
-    Path indexFile = ouputFileManager
-        .getOutputIndexFileForWrite(INDEX_RECORD_LENGTH);
-    sr.writeToFile(indexFile, conf);
-    return numRecords > 0;
-  }
-
-  @Override
-  public void write(Object key, Object value) throws IOException {
-    this.writer.append(key, value);
-    numRecords++;
-  }
-
-  public void initWriter() throws IOException {
-    Path outputFile = ouputFileManager.getOutputFileForWrite();
-
-    // TODO NEWTEZ Consider making the buffer size configurable. Also consider
-    // setting up an in-memory buffer which is occasionally flushed to disk so
-    // that the output does not block.
-
-    // TODO NEWTEZ maybe use appropriate counter
-    this.writer = new IFile.Writer(conf, rfs, outputFile, keyClass, valClass,
-        codec, null);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
deleted file mode 100644
index f73adfd..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-
-@SuppressWarnings({"unchecked", "rawtypes"})
-public class ConfigUtils {
-
-  public static Class<? extends CompressionCodec> getIntermediateOutputCompressorClass(
-      Configuration conf, Class<DefaultCodec> defaultValue) {
-    Class<? extends CompressionCodec> codecClass = defaultValue;
-    String name = conf
-        .get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC);
-    if (name != null) {
-      try {
-        codecClass = conf.getClassByName(name).asSubclass(
-            CompressionCodec.class);
-      } catch (ClassNotFoundException e) {
-        throw new IllegalArgumentException("Compression codec " + name
-            + " was not found.", e);
-      }
-    }
-    return codecClass;
-  }
-  
-  public static Class<? extends CompressionCodec> getIntermediateInputCompressorClass(
-      Configuration conf, Class<DefaultCodec> defaultValue) {
-    Class<? extends CompressionCodec> codecClass = defaultValue;
-    String name = conf
-        .get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC);
-    if (name != null) {
-      try {
-        codecClass = conf.getClassByName(name).asSubclass(
-            CompressionCodec.class);
-      } catch (ClassNotFoundException e) {
-        throw new IllegalArgumentException("Compression codec " + name
-            + " was not found.", e);
-      }
-    }
-    return codecClass;
-  }
-
-
-  // TODO Move defaults over to a constants file.
-  
-  public static boolean shouldCompressIntermediateOutput(Configuration conf) {
-    return conf.getBoolean(
-        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false);
-  }
-
-  public static boolean isIntermediateInputCompressed(Configuration conf) {
-    return conf.getBoolean(
-        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED, false);
-  }
-
-  public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf) {
-    Class<V> retv = (Class<V>) conf.getClass(
-        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS, null,
-        Object.class);
-    return retv;
-  }
-  
-  public static <V> Class<V> getIntermediateInputValueClass(Configuration conf) {
-    Class<V> retv = (Class<V>) conf.getClass(
-        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS, null,
-        Object.class);
-    return retv;
-  }
-
-  public static <K> Class<K> getIntermediateOutputKeyClass(Configuration conf) {
-    Class<K> retv = (Class<K>) conf.getClass(
-        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS, null,
-        Object.class);
-    return retv;
-  }
-
-  public static <K> Class<K> getIntermediateInputKeyClass(Configuration conf) {
-    Class<K> retv = (Class<K>) conf.getClass(
-        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS, null,
-        Object.class);
-    return retv;
-  }
-
-  public static <K> RawComparator<K> getIntermediateOutputKeyComparator(Configuration conf) {
-    Class<? extends RawComparator> theClass = conf.getClass(
-        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, null,
-        RawComparator.class);
-    if (theClass != null)
-      return ReflectionUtils.newInstance(theClass, conf);
-    return WritableComparator.get(getIntermediateOutputKeyClass(conf).asSubclass(
-        WritableComparable.class));
-  }
-
-  public static <K> RawComparator<K> getIntermediateInputKeyComparator(Configuration conf) {
-    Class<? extends RawComparator> theClass = conf.getClass(
-        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, null,
-        RawComparator.class);
-    if (theClass != null)
-      return ReflectionUtils.newInstance(theClass, conf);
-    return WritableComparator.get(getIntermediateInputKeyClass(conf).asSubclass(
-        WritableComparable.class));
-  }
-
-  
-  
-  // TODO Fix name
-  public static <V> RawComparator<V> getInputKeySecondaryGroupingComparator(
-      Configuration conf) {
-    Class<? extends RawComparator> theClass = conf
-        .getClass(
-            TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
-            null, RawComparator.class);
-    if (theClass == null) {
-      return getIntermediateInputKeyComparator(conf);
-    }
-
-    return ReflectionUtils.newInstance(theClass, conf);
-  }
-  
-  public static boolean useNewApi(Configuration conf) {
-    return conf.getBoolean("mapred.mapper.new-api", false);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java b/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java
deleted file mode 100644
index 076807e..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-
-/**
- * Container for a task number and an attempt number for the task.
- */
-@Private
-public class InputAttemptIdentifier {
-
-  private final InputIdentifier inputIdentifier;
-  private final int attemptNumber;
-  private String pathComponent;
-  
-  public InputAttemptIdentifier(int taskIndex, int attemptNumber) {
-    this(new InputIdentifier(taskIndex), attemptNumber, null);
-  }
-  
-  public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent) {
-    this.inputIdentifier = inputIdentifier;
-    this.attemptNumber = attemptNumber;
-    this.pathComponent = pathComponent;
-  }
-  
-  public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
-    this(new InputIdentifier(taskIndex), attemptNumber, pathComponent);
-  }
-
-  public InputIdentifier getInputIdentifier() {
-    return this.inputIdentifier;
-  }
-
-  public int getAttemptNumber() {
-    return attemptNumber;
-  }
-  
-  public String getPathComponent() {
-    return pathComponent;
-  }
-
-  // PathComponent does not need to be part of the hashCode and equals computation.
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + attemptNumber;
-    result = prime * result
-        + ((inputIdentifier == null) ? 0 : inputIdentifier.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    InputAttemptIdentifier other = (InputAttemptIdentifier) obj;
-    if (attemptNumber != other.attemptNumber)
-      return false;
-    if (inputIdentifier == null) {
-      if (other.inputIdentifier != null)
-        return false;
-    } else if (!inputIdentifier.equals(other.inputIdentifier))
-      return false;
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier
-        + ", attemptNumber=" + attemptNumber + ", pathComponent="
-        + pathComponent + "]";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java b/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java
deleted file mode 100644
index b694530..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common;
-
-public class InputIdentifier {
-
-  private final int srcTaskIndex;
-  
-  public InputIdentifier(int srcTaskIndex) {
-    this.srcTaskIndex = srcTaskIndex;
-  }
-
-  public int getSrcTaskIndex() {
-    return this.srcTaskIndex;
-  }
-
-  @Override
-  public int hashCode() {
-    return srcTaskIndex;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    InputIdentifier other = (InputIdentifier) obj;
-    if (srcTaskIndex != other.srcTaskIndex)
-      return false;
-    return true;
-  }
-
-  @Override
-  public String toString() {
-    return "InputIdentifier [srcTaskIndex=" + srcTaskIndex + "]";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
deleted file mode 100644
index cc29e94..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.Partitioner;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.api.TezTaskContext;
-import org.apache.tez.engine.common.combine.Combiner;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
-
-public class TezEngineUtils {
-
-  private static final Log LOG = LogFactory
-      .getLog(TezEngineUtils.class);
-  
-  public static String getTaskIdentifier(String vertexName, int taskIndex) {
-    return String.format("%s_%06d", vertexName, taskIndex);
-  }
-
-  public static String getTaskAttemptIdentifier(int taskIndex,
-      int taskAttemptNumber) {
-    return String.format("%d_%d", taskIndex, taskAttemptNumber);
-  }
-
-  // TODO Maybe include a dag name in this.
-  public static String getTaskAttemptIdentifier(String vertexName,
-      int taskIndex, int taskAttemptNumber) {
-    return String.format("%s_%06d_%02d", vertexName, taskIndex,
-        taskAttemptNumber);
-  }
-
-  @SuppressWarnings("unchecked")
-  public static Combiner instantiateCombiner(Configuration conf, TezTaskContext taskContext) throws IOException {
-    Class<? extends Combiner> clazz;
-    String className = conf.get(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS);
-    if (className == null) {
-      LOG.info("No combiner specified via " + TezJobConfig.TEZ_ENGINE_COMBINER_CLASS + ". Combiner will not be used");
-      return null;
-    }
-    LOG.info("Using Combiner class: " + className);
-    try {
-      clazz = (Class<? extends Combiner>) conf.getClassByName(className);
-    } catch (ClassNotFoundException e) {
-      throw new IOException("Unable to load combiner class: " + className);
-    }
-    
-    Combiner combiner = null;
-    
-      Constructor<? extends Combiner> ctor;
-      try {
-        ctor = clazz.getConstructor(TezTaskContext.class);
-        combiner = ctor.newInstance(taskContext);
-      } catch (SecurityException e) {
-        throw new IOException(e);
-      } catch (NoSuchMethodException e) {
-        throw new IOException(e);
-      } catch (IllegalArgumentException e) {
-        throw new IOException(e);
-      } catch (InstantiationException e) {
-        throw new IOException(e);
-      } catch (IllegalAccessException e) {
-        throw new IOException(e);
-      } catch (InvocationTargetException e) {
-        throw new IOException(e);
-      }
-      return combiner;
-  }
-  
-  @SuppressWarnings("unchecked")
-  public static Partitioner instantiatePartitioner(Configuration conf)
-      throws IOException {
-    Class<? extends Partitioner> clazz;
-    try {
-      clazz = (Class<? extends Partitioner>) conf
-          .getClassByName(conf.get(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS));
-    } catch (ClassNotFoundException e) {
-      throw new IOException("Unable to find Partitioner class in config", e);
-    }
-
-    LOG.info("Using partitioner class: " + clazz.getName());
-
-    Partitioner partitioner = null;
-
-    try {
-      Constructor<? extends Partitioner> ctorWithConf = clazz
-          .getConstructor(Configuration.class);
-      partitioner = ctorWithConf.newInstance(conf);
-    } catch (SecurityException e) {
-      throw new IOException(e);
-    } catch (NoSuchMethodException e) {
-      try {
-        // Try a 0 argument constructor.
-        partitioner = clazz.newInstance();
-      } catch (InstantiationException e1) {
-        throw new IOException(e1);
-      } catch (IllegalAccessException e1) {
-        throw new IOException(e1);
-      }
-    } catch (IllegalArgumentException e) {
-      throw new IOException(e);
-    } catch (InstantiationException e) {
-      throw new IOException(e);
-    } catch (IllegalAccessException e) {
-      throw new IOException(e);
-    } catch (InvocationTargetException e) {
-      throw new IOException(e);
-    }
-    return partitioner;
-  }
-  
-  public static TezTaskOutput instantiateTaskOutputManager(Configuration conf, TezOutputContext outputContext) {
-    Class<?> clazz = conf.getClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
-        TezTaskOutputFiles.class);
-    try {
-      Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class);
-      ctor.setAccessible(true);
-      TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf, outputContext.getUniqueIdentifier());
-      return instance;
-    } catch (Exception e) {
-      throw new TezUncheckedException(
-          "Unable to instantiate configured TezOutputFileManager: "
-              + conf.get(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
-                  TezTaskOutputFiles.class.getName()), e);
-    }
-  }
-}