You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:45 UTC

[38/50] [abbrv] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
deleted file mode 100644
index a984b0f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.input;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVReader;
-import org.apache.tez.engine.api.LogicalInput;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.ValuesIterator;
-import org.apache.tez.engine.common.shuffle.impl.Shuffle;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-/**
- * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
- * intermediate sorted data, merges them and provides key/<values> to the
- * consumer.
- *
- * The Copy and Merge will be triggered by the initialization - which is handled
- * by the Tez framework. Input is not consumable until the Copy and Merge are
- * complete. Methods are provided to check for this, as well as to wait for
- * completion. Attempting to get a reader on a non-complete input will block.
- *
- */
-public class ShuffledMergedInput implements LogicalInput {
-
-  static final Log LOG = LogFactory.getLog(ShuffledMergedInput.class);
-
-  protected TezInputContext inputContext;
-  protected TezRawKeyValueIterator rawIter = null;
-  protected Configuration conf;
-  protected int numInputs = 0;
-  protected Shuffle shuffle;
-  @SuppressWarnings("rawtypes")
-  protected ValuesIterator vIter;
-
-  private TezCounter inputKeyCounter;
-  private TezCounter inputValueCounter;
-
-  @Override
-  public List<Event> initialize(TezInputContext inputContext) throws IOException {
-    this.inputContext = inputContext;
-    this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
-
-    this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
-    this.inputValueCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
-    this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
-        inputContext.getWorkDirs());
-
-    // Start the shuffle - copy and merge.
-    shuffle = new Shuffle(inputContext, this.conf, numInputs);
-    shuffle.run();
-
-    return Collections.emptyList();
-  }
-
-  /**
-   * Check if the input is ready for consumption
-   *
-   * @return true if the input is ready for consumption, or if an error occurred
-   *         processing fetching the input. false if the shuffle and merge are
-   *         still in progress
-   */
-  public boolean isInputReady() {
-    return shuffle.isInputReady();
-  }
-
-  /**
-   * Waits for the input to become ready for consumption
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public void waitForInputReady() throws IOException, InterruptedException {
-    rawIter = shuffle.waitForInput();
-    createValuesIterator();
-  }
-
-  @Override
-  public List<Event> close() throws IOException {
-    rawIter.close();
-    return Collections.emptyList();
-  }
-
-  /**
-   * Get a KVReader for the Input.</p> This method will block until the input is
-   * ready - i.e. the copy and merge stages are complete. Users can use the
-   * isInputReady method to check if the input is ready, which gives an
-   * indication of whether this method will block or not.
-   *
-   * NOTE: All values for the current K-V pair must be read prior to invoking
-   * moveToNext. Once moveToNext() is called, the valueIterator from the
-   * previous K-V pair will throw an Exception
-   *
-   * @return a KVReader over the sorted input.
-   */
-  @Override
-  public KVReader getReader() throws IOException {
-    if (rawIter == null) {
-      try {
-        waitForInputReady();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException("Interrupted while waiting for input ready", e);
-      }
-    }
-    return new KVReader() {
-
-      @Override
-      public boolean next() throws IOException {
-        return vIter.moveToNext();
-      }
-
-      @SuppressWarnings("unchecked")
-      @Override
-      public KVRecord getCurrentKV() {
-        return new KVRecord(vIter.getKey(), vIter.getValues());
-      }
-    };
-  }
-
-  @Override
-  public void handleEvents(List<Event> inputEvents) {
-    shuffle.handleEvents(inputEvents);
-  }
-
-  @Override
-  public void setNumPhysicalInputs(int numInputs) {
-    this.numInputs = numInputs;
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  protected void createValuesIterator()
-      throws IOException {
-    vIter = new ValuesIterator(rawIter,
-        (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf),
-        ConfigUtils.getIntermediateInputKeyClass(conf),
-        ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter);
-
-  }
-
-  // This functionality is currently broken. If there's inputs which need to be
-  // written to disk, there's a possibility that inputs from the different
-  // sources could clobber each others' output. Also the current structures do
-  // not have adequate information to de-dupe these (vertex name)
-//  public void mergeWith(ShuffledMergedInput other) {
-//    this.numInputs += other.getNumPhysicalInputs();
-//  }
-//
-//  public int getNumPhysicalInputs() {
-//    return this.numInputs;
-//  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java
deleted file mode 100644
index f2da031..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInputLegacy.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
- * intermediate sorted data, merges them and provides key/<values> to the
- * consumer.
- * 
- * The Copy and Merge will be triggered by the initialization - which is handled
- * by the Tez framework. Input is not consumable until the Copy and Merge are
- * complete. Methods are provided to check for this, as well as to wait for
- * completion. Attempting to get a reader on a non-complete input will block.
- * 
- */
-
-package org.apache.tez.engine.lib.input;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-@LimitedPrivate("mapreduce")
-public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
-
-  @Private
-  public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
-    // wait for input so that iterator is available
-    waitForInputReady();
-    return rawIter;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledUnorderedKVInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledUnorderedKVInput.java
deleted file mode 100644
index 44238fd..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledUnorderedKVInput.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.lib.input;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.LogicalInput;
-import org.apache.tez.engine.api.Reader;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.broadcast.input.BroadcastShuffleManager;
-
-import com.google.common.base.Preconditions;
-
-public class ShuffledUnorderedKVInput implements LogicalInput {
-
-  private Configuration conf;
-  private int numInputs = -1;
-  private BroadcastShuffleManager shuffleManager;
-  
-  
-  
-  public ShuffledUnorderedKVInput() {
-  }
-
-  @Override
-  public List<Event> initialize(TezInputContext inputContext) throws Exception {
-    Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
-    this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
-    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
-    
-    this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs);
-    return null;
-  }
-
-  @Override
-  public Reader getReader() throws Exception {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  public void handleEvents(List<Event> inputEvents) {
-    shuffleManager.handleEvents(inputEvents);
-  }
-
-  @Override
-  public List<Event> close() throws Exception {
-    this.shuffleManager.shutdown();
-    return null;
-  }
-
-  @Override
-  public void setNumPhysicalInputs(int numInputs) {
-    this.numInputs = numInputs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
deleted file mode 100644
index 26a01c8..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.output;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVWriter;
-import org.apache.tez.engine.api.LogicalOutput;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.api.Writer;
-import org.apache.tez.engine.common.sort.impl.dflt.InMemoryShuffleSorter;
-
-/**
- * {@link InMemorySortedOutput} is an {@link Output} which sorts key/value pairs 
- * written to it and persists it to a file.
- */
-public class InMemorySortedOutput implements LogicalOutput {
-  
-  protected InMemoryShuffleSorter sorter;
-  protected int numTasks;
-  protected TezOutputContext outputContext;
-  
-
-  @Override
-  public List<Event> initialize(TezOutputContext outputContext)
-      throws IOException {
-    this.outputContext = outputContext;
-    this.sorter = new InMemoryShuffleSorter();
-    sorter.initialize(outputContext, TezUtils.createConfFromUserPayload(outputContext.getUserPayload()), numTasks);
-    return Collections.emptyList();
-  }
-
-  @Override
-  public Writer getWriter() throws IOException {
-    return new KVWriter() {
-      
-      @Override
-      public void write(Object key, Object value) throws IOException {
-        sorter.write(key, value);
-      }
-    };
-  }
-
-  @Override
-  public void handleEvents(List<Event> outputEvents) {
-    // No events expected.
-  }
-
-  @Override
-  public void setNumPhysicalOutputs(int numOutputs) {
-    this.numTasks = numOutputs;
-  }
-  
-  @Override
-  public List<Event> close() throws IOException {
-    sorter.flush();
-    sorter.close();
-    // TODO NEWTEZ Event generation
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
deleted file mode 100644
index 7fd26d7..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.lib.output;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-
-public class LocalOnFileSorterOutput extends OnFileSortedOutput {
-
-  private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class);
-
-  
-
-  @Override
-  public List<Event> close() throws IOException {
-    LOG.debug("Closing LocalOnFileSorterOutput");
-    super.close();
-
-    TezTaskOutput mapOutputFile = sorter.getMapOutput();
-    FileSystem localFs = FileSystem.getLocal(conf);
-
-    Path src = mapOutputFile.getOutputFile();
-    Path dst =
-        mapOutputFile.getInputFileForWrite(
-            outputContext.getTaskIndex(),
-            localFs.getFileStatus(src).getLen());
-
-    LOG.info("Renaming src = " + src + ", dst = " + dst);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Renaming src = " + src + ", dst = " + dst);
-    }
-    localFs.rename(src, dst);
-    return null;
-  }
-  
-  @Override
-  protected List<Event> generateDataMovementEventsOnClose() throws IOException {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
deleted file mode 100644
index 9c9eba0..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.output;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVWriter;
-import org.apache.tez.engine.api.LogicalOutput;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.api.events.DataMovementEvent;
-import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.apache.tez.engine.common.sort.impl.ExternalSorter;
-import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
-
-import com.google.common.collect.Lists;
-
-/**
- * <code>OnFileSortedOutput</code> is an {@link LogicalOutput} which sorts key/value pairs 
- * written to it and persists it to a file.
- */
-public class OnFileSortedOutput implements LogicalOutput {
-  
-  protected ExternalSorter sorter;
-  protected Configuration conf;
-  protected int numOutputs;
-  protected TezOutputContext outputContext;
-  private long startTime;
-  private long endTime;
-  
-  
-  @Override
-  public List<Event> initialize(TezOutputContext outputContext)
-      throws IOException {
-    this.startTime = System.nanoTime();
-    this.outputContext = outputContext;
-    sorter = new DefaultSorter();
-    this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload());
-    // Initializing this parametr in this conf since it is used in multiple
-    // places (wherever LocalDirAllocator is used) - TezTaskOutputFiles,
-    // TezMerger, etc.
-    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
-    sorter.initialize(outputContext, conf, numOutputs);
-    return Collections.emptyList();
-  }
-
-  @Override
-  public KVWriter getWriter() throws IOException {
-    return new KVWriter() {
-      @Override
-      public void write(Object key, Object value) throws IOException {
-        sorter.write(key, value);
-      }
-    };
-  }
-
-  @Override
-  public void handleEvents(List<Event> outputEvents) {
-    // Not expecting any events.
-  }
-
-  @Override
-  public void setNumPhysicalOutputs(int numOutputs) {
-    this.numOutputs = numOutputs;
-  }
-
-  @Override
-  public List<Event> close() throws IOException {
-    sorter.flush();
-    sorter.close();
-    this.endTime = System.nanoTime();
-
-   return generateDataMovementEventsOnClose();
-  }
-  
-  protected List<Event> generateDataMovementEventsOnClose() throws IOException {
-    String host = System.getenv(ApplicationConstants.Environment.NM_HOST
-        .toString());
-    ByteBuffer shuffleMetadata = outputContext
-        .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
-    int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
-
-    DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
-        .newBuilder();
-    payloadBuilder.setHost(host);
-    payloadBuilder.setPort(shufflePort);
-    payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
-    payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000));
-    DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
-    byte[] payloadBytes = payloadProto.toByteArray();
-
-    List<Event> events = Lists.newArrayListWithCapacity(numOutputs);
-
-    for (int i = 0; i < numOutputs; i++) {
-      DataMovementEvent event = new DataMovementEvent(i, payloadBytes);
-      events.add(event);
-    }
-    return events;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java
deleted file mode 100644
index 3ff603f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.lib.output;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVWriter;
-import org.apache.tez.engine.api.LogicalOutput;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.api.events.DataMovementEvent;
-import org.apache.tez.engine.broadcast.output.FileBasedKVWriter;
-import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class OnFileUnorderedKVOutput implements LogicalOutput {
-
-  private TezOutputContext outputContext;
-  private FileBasedKVWriter kvWriter;
-
-  public OnFileUnorderedKVOutput() {
-  }
-
-  @Override
-  public List<Event> initialize(TezOutputContext outputContext)
-      throws Exception {
-    this.outputContext = outputContext;
-    this.kvWriter = new FileBasedKVWriter(outputContext);
-    return Collections.emptyList();
-  }
-
-  @Override
-  public KVWriter getWriter() throws Exception {
-    return kvWriter;
-  }
-
-  @Override
-  public void handleEvents(List<Event> outputEvents) {
-    throw new TezUncheckedException("Not expecting any events");
-  }
-
-  @Override
-  public List<Event> close() throws Exception {
-    boolean outputGenerated = this.kvWriter.close();
-    DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
-        .newBuilder();
-
-    String host = System.getenv(ApplicationConstants.Environment.NM_HOST
-        .toString());
-    ByteBuffer shuffleMetadata = outputContext
-        .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
-    int shufflePort = ShuffleUtils
-        .deserializeShuffleProviderMetaData(shuffleMetadata);
-    payloadBuilder.setOutputGenerated(outputGenerated);
-    if (outputGenerated) {
-      payloadBuilder.setHost(host);
-      payloadBuilder.setPort(shufflePort);
-      payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
-    }
-    DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
-
-    DataMovementEvent dmEvent = new DataMovementEvent(0,
-        payloadProto.toByteArray());
-    List<Event> events = Lists.newArrayListWithCapacity(1);
-    events.add(dmEvent);
-    return events;
-  }
-
-  @Override
-  public void setNumPhysicalOutputs(int numOutputs) {
-    Preconditions.checkArgument(numOutputs == 1,
-        "Number of outputs can only be 1 for " + this.getClass().getName());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
deleted file mode 100644
index 29063f9..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ /dev/null
@@ -1,475 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newruntime;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.LogicalIOProcessor;
-import org.apache.tez.engine.api.LogicalInput;
-import org.apache.tez.engine.api.LogicalOutput;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.api.Processor;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.api.TezProcessorContext;
-import org.apache.tez.engine.api.impl.EventMetaData;
-import org.apache.tez.engine.api.impl.InputSpec;
-import org.apache.tez.engine.api.impl.OutputSpec;
-import org.apache.tez.engine.api.impl.TaskSpec;
-import org.apache.tez.engine.api.impl.TezEvent;
-import org.apache.tez.engine.api.impl.TezInputContextImpl;
-import org.apache.tez.engine.api.impl.TezOutputContextImpl;
-import org.apache.tez.engine.api.impl.TezProcessorContextImpl;
-import org.apache.tez.engine.api.impl.TezUmbilical;
-import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-@Private
-public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
-
-  private static final Log LOG = LogFactory
-      .getLog(LogicalIOProcessorRuntimeTask.class);
-
-  private final List<InputSpec> inputSpecs;
-  private final List<LogicalInput> inputs;
-
-  private final List<OutputSpec> outputSpecs;
-  private final List<LogicalOutput> outputs;
-
-  private List<TezInputContext> inputContexts;
-  private List<TezOutputContext> outputContexts;
-  private TezProcessorContext processorContext;
-  
-  private final ProcessorDescriptor processorDescriptor;
-  private final LogicalIOProcessor processor;
-
-  private final Map<String, ByteBuffer> serviceConsumerMetadata;
-
-  private Map<String, LogicalInput> inputMap;
-  private Map<String, LogicalOutput> outputMap;
-
-  private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
-  private Thread eventRouterThread = null;
-
-  private final int appAttemptNumber;
-
-  public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
-      Configuration tezConf, TezUmbilical tezUmbilical,
-      Token<JobTokenIdentifier> jobToken) throws IOException {
-    // TODO Remove jobToken from here post TEZ-421
-    super(taskSpec, tezConf, tezUmbilical);
-    LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
-        + taskSpec);
-    this.inputContexts = new ArrayList<TezInputContext>(taskSpec.getInputs().size());
-    this.outputContexts = new ArrayList<TezOutputContext>(taskSpec.getOutputs().size());
-    this.inputSpecs = taskSpec.getInputs();
-    this.inputs = createInputs(inputSpecs);
-    this.outputSpecs = taskSpec.getOutputs();
-    this.outputs = createOutputs(outputSpecs);
-    this.processorDescriptor = taskSpec.getProcessorDescriptor();
-    this.processor = createProcessor(processorDescriptor);
-    this.serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
-    this.serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
-        ShuffleUtils.convertJobTokenToBytes(jobToken));
-    this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();
-    this.state = State.NEW;
-    this.appAttemptNumber = appAttemptNumber;
-  }
-
-  public void initialize() throws Exception {
-    LOG.info("Initializing LogicalProcessorIORuntimeTask");
-    Preconditions.checkState(this.state == State.NEW, "Already initialized");
-    this.state = State.INITED;
-    inputMap = new LinkedHashMap<String, LogicalInput>(inputs.size());
-    outputMap = new LinkedHashMap<String, LogicalOutput>(outputs.size());
-
-    // TODO Maybe close initialized inputs / outputs in case of failure to
-    // initialize.
-    // Initialize all inputs. TODO: Multi-threaded at some point.
-    for (int i = 0; i < inputs.size(); i++) {
-      String srcVertexName = inputSpecs.get(i).getSourceVertexName();
-      initializeInput(inputs.get(i),
-          inputSpecs.get(i));
-      inputMap.put(srcVertexName, inputs.get(i));
-    }
-
-    // Initialize all outputs. TODO: Multi-threaded at some point.
-    for (int i = 0; i < outputs.size(); i++) {
-      String destVertexName = outputSpecs.get(i).getDestinationVertexName();
-      initializeOutput(outputs.get(i), outputSpecs.get(i));
-      outputMap.put(destVertexName, outputs.get(i));
-    }
-
-    // Initialize processor.
-    initializeLogicalIOProcessor();
-    startRouterThread();
-  }
-
-  public void run() throws Exception {
-    synchronized (this.state) {
-      Preconditions.checkState(this.state == State.INITED,
-          "Can only run while in INITED state. Current: " + this.state);
-      this.state = State.RUNNING;
-    }
-    LogicalIOProcessor lioProcessor = (LogicalIOProcessor) processor;
-    lioProcessor.run(inputMap, outputMap);
-  }
-
-  public void close() throws Exception {
-    try {
-      Preconditions.checkState(this.state == State.RUNNING,
-          "Can only run while in RUNNING state. Current: " + this.state);
-      this.state = State.CLOSED;
-
-      // Close the Inputs.
-      for (int i = 0; i < inputs.size(); i++) {
-        String srcVertexName = inputSpecs.get(i).getSourceVertexName();
-        List<Event> closeInputEvents = inputs.get(i).close();
-        sendTaskGeneratedEvents(closeInputEvents,
-            EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
-            srcVertexName, taskSpec.getTaskAttemptID());
-      }
-
-      // Close the Processor.
-      processor.close();
-
-      // Close the Outputs.
-      for (int i = 0; i < outputs.size(); i++) {
-        String destVertexName = outputSpecs.get(i).getDestinationVertexName();
-        List<Event> closeOutputEvents = outputs.get(i).close();
-        sendTaskGeneratedEvents(closeOutputEvents,
-            EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
-            destVertexName, taskSpec.getTaskAttemptID());
-      }
-    } finally {
-      setTaskDone();
-      if (eventRouterThread != null) {
-        eventRouterThread.interrupt();
-      }
-    }
-  }
-
-  private void initializeInput(Input input, InputSpec inputSpec)
-      throws Exception {
-    TezInputContext tezInputContext = createInputContext(inputSpec);
-    inputContexts.add(tezInputContext);
-    if (input instanceof LogicalInput) {
-      ((LogicalInput) input).setNumPhysicalInputs(inputSpec
-          .getPhysicalEdgeCount());
-    }
-    LOG.info("Initializing Input using InputSpec: " + inputSpec);
-    List<Event> events = input.initialize(tezInputContext);
-    sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
-        tezInputContext.getTaskVertexName(),
-        tezInputContext.getSourceVertexName(), taskSpec.getTaskAttemptID());
-  }
-
-  private void initializeOutput(Output output, OutputSpec outputSpec)
-      throws Exception {
-    TezOutputContext tezOutputContext = createOutputContext(outputSpec);
-    outputContexts.add(tezOutputContext);
-    if (output instanceof LogicalOutput) {
-      ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
-          .getPhysicalEdgeCount());
-    }
-    LOG.info("Initializing Output using OutputSpec: " + outputSpec);
-    List<Event> events = output.initialize(tezOutputContext);
-    sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
-        tezOutputContext.getTaskVertexName(),
-        tezOutputContext.getDestinationVertexName(),
-        taskSpec.getTaskAttemptID());
-  }
-
-  private void initializeLogicalIOProcessor() throws Exception {
-    LOG.info("Initializing processor"
-        + ", processorClassName=" + processorDescriptor.getClassName());
-    TezProcessorContext processorContext = createProcessorContext();
-    this.processorContext = processorContext;
-    processor.initialize(processorContext);
-  }
-
-  private TezInputContext createInputContext(InputSpec inputSpec) {
-    TezInputContext inputContext = new TezInputContextImpl(tezConf,
-        appAttemptNumber, tezUmbilical, taskSpec.getVertexName(),
-        inputSpec.getSourceVertexName(), taskSpec.getTaskAttemptID(),
-        tezCounters,
-        inputSpec.getInputDescriptor().getUserPayload() == null ? taskSpec
-            .getProcessorDescriptor().getUserPayload() : inputSpec
-            .getInputDescriptor().getUserPayload(), this,
-        serviceConsumerMetadata);
-    return inputContext;
-  }
-
-  private TezOutputContext createOutputContext(OutputSpec outputSpec) {
-    TezOutputContext outputContext = new TezOutputContextImpl(tezConf,
-        appAttemptNumber, tezUmbilical, taskSpec.getVertexName(),
-        outputSpec.getDestinationVertexName(), taskSpec.getTaskAttemptID(),
-        tezCounters,
-        outputSpec.getOutputDescriptor().getUserPayload() == null ? taskSpec
-            .getProcessorDescriptor().getUserPayload() : outputSpec
-            .getOutputDescriptor().getUserPayload(), this,
-        serviceConsumerMetadata);
-    return outputContext;
-  }
-
-  private TezProcessorContext createProcessorContext() {
-    TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
-        appAttemptNumber, tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
-        tezCounters, processorDescriptor.getUserPayload(), this,
-        serviceConsumerMetadata);
-    return processorContext;
-  }
-
-  private List<LogicalInput> createInputs(List<InputSpec> inputSpecs) {
-    List<LogicalInput> inputs = new ArrayList<LogicalInput>(inputSpecs.size());
-    for (InputSpec inputSpec : inputSpecs) {
-      LOG.info("Creating Input from InputSpec: "
-          + inputSpec);
-      Input input = RuntimeUtils.createClazzInstance(inputSpec
-          .getInputDescriptor().getClassName());
-
-      if (input instanceof LogicalInput) {
-        inputs.add((LogicalInput) input);
-      } else {
-        throw new TezUncheckedException(input.getClass().getName()
-            + " is not a sub-type of LogicalInput."
-            + " Only LogicalInput sub-types supported by LogicalIOProcessor.");
-      }
-    }
-    return inputs;
-  }
-
-  private List<LogicalOutput> createOutputs(List<OutputSpec> outputSpecs) {
-    List<LogicalOutput> outputs = new ArrayList<LogicalOutput>(
-        outputSpecs.size());
-    for (OutputSpec outputSpec : outputSpecs) {
-      LOG.info("Creating Output from OutputSpec"
-          + outputSpec);
-      Output output = RuntimeUtils.createClazzInstance(outputSpec
-          .getOutputDescriptor().getClassName());
-      if (output instanceof LogicalOutput) {
-        outputs.add((LogicalOutput) output);
-      } else {
-        throw new TezUncheckedException(output.getClass().getName()
-            + " is not a sub-type of LogicalOutput."
-            + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
-      }
-    }
-    return outputs;
-  }
-
-  private LogicalIOProcessor createProcessor(
-      ProcessorDescriptor processorDescriptor) {
-    Processor processor = RuntimeUtils.createClazzInstance(processorDescriptor
-        .getClassName());
-    if (!(processor instanceof LogicalIOProcessor)) {
-      throw new TezUncheckedException(processor.getClass().getName()
-          + " is not a sub-type of LogicalIOProcessor."
-          + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
-    }
-    return (LogicalIOProcessor) processor;
-  }
-
-  private void sendTaskGeneratedEvents(List<Event> events,
-      EventProducerConsumerType generator, String taskVertexName,
-      String edgeVertexName, TezTaskAttemptID taskAttemptID) {
-    if (events == null || events.isEmpty()) {
-      return;
-    }
-    EventMetaData eventMetaData = new EventMetaData(generator,
-        taskVertexName, edgeVertexName, taskAttemptID);
-    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
-    for (Event e : events) {
-      TezEvent te = new TezEvent(e, eventMetaData);
-      tezEvents.add(te);
-    }
-    if (LOG.isDebugEnabled()) {
-      for (TezEvent e : tezEvents) {
-        LOG.debug("Generated event info"
-            + ", eventMetaData=" + eventMetaData.toString()
-            + ", eventType=" + e.getEventType());
-      }
-    }
-    tezUmbilical.addEvents(tezEvents);
-  }
-
-  private boolean handleEvent(TezEvent e) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Handling TezEvent in task"
-          + ", taskAttemptId=" + taskSpec.getTaskAttemptID()
-          + ", eventType=" + e.getEventType()
-          + ", eventSourceInfo=" + e.getSourceInfo()
-          + ", eventDestinationInfo=" + e.getDestinationInfo());
-    }
-    try {
-      switch (e.getDestinationInfo().getEventGenerator()) {
-      case INPUT:
-        LogicalInput input = inputMap.get(
-            e.getDestinationInfo().getEdgeVertexName());
-        if (input != null) {
-          input.handleEvents(Collections.singletonList(e.getEvent()));
-        } else {
-          throw new TezUncheckedException("Unhandled event for invalid target: "
-              + e);
-        }
-        break;
-      case OUTPUT:
-        LogicalOutput output = outputMap.get(
-            e.getDestinationInfo().getEdgeVertexName());
-        if (output != null) {
-          output.handleEvents(Collections.singletonList(e.getEvent()));
-        } else {
-          throw new TezUncheckedException("Unhandled event for invalid target: "
-              + e);
-        }
-        break;
-      case PROCESSOR:
-        processor.handleEvents(Collections.singletonList(e.getEvent()));
-        break;
-      case SYSTEM:
-        LOG.warn("Trying to send a System event in a Task: " + e);
-        break;
-      }
-    } catch (Throwable t) {
-      LOG.warn("Failed to handle event", t);
-      setFatalError(t, "Failed to handle event");
-      EventMetaData sourceInfo = new EventMetaData(
-          e.getDestinationInfo().getEventGenerator(),
-          taskSpec.getVertexName(), e.getDestinationInfo().getEdgeVertexName(),
-          getTaskAttemptID());
-      tezUmbilical.signalFatalError(getTaskAttemptID(),
-          StringUtils.stringifyException(t), sourceInfo);
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public synchronized void handleEvents(Collection<TezEvent> events) {
-    if (events == null || events.isEmpty()) {
-      return;
-    }
-    eventCounter.addAndGet(events.size());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Received events to be processed by task"
-          + ", taskAttemptId=" + taskSpec.getTaskAttemptID()
-          + ", eventCount=" + events.size()
-          + ", newEventCounter=" + eventCounter.get());
-    }
-    eventsToBeProcessed.addAll(events);
-  }
-
-  private void startRouterThread() {
-    eventRouterThread = new Thread(new Runnable() {
-      public void run() {
-        while (!isTaskDone() && !Thread.currentThread().isInterrupted()) {
-          try {
-            TezEvent e = eventsToBeProcessed.take();
-            if (e == null) {
-              continue;
-            }
-            // TODO TODONEWTEZ
-            if (!handleEvent(e)) {
-              LOG.warn("Stopping Event Router thread as failed to handle"
-                  + " event: " + e);
-              return;
-            }
-          } catch (InterruptedException e) {
-            if (!isTaskDone()) {
-              LOG.warn("Event Router thread interrupted. Returning.");
-            }
-            return;
-          }
-        }
-      }
-    });
-
-    eventRouterThread.setName("TezTaskEventRouter["
-        + taskSpec.getTaskAttemptID().toString() + "]");
-    eventRouterThread.start();
-  }
-
-  public synchronized void cleanup() {
-    setTaskDone();
-    if (eventRouterThread != null) {
-      eventRouterThread.interrupt();
-    }
-  }
-  
-  @Private
-  @VisibleForTesting
-  public List<TezInputContext> getInputContexts() {
-    return this.inputContexts;
-  }
-  
-  @Private
-  @VisibleForTesting
-  public List<TezOutputContext> getOutputContexts() {
-    return this.outputContexts;
-  }
-
-  @Private
-  @VisibleForTesting
-  public TezProcessorContext getProcessorContext() {
-    return this.processorContext;
-  }
-  
-  @Private
-  @VisibleForTesting
-  public Map<String, LogicalInput> getInputs() {
-    return this.inputMap;
-  }
-  
-  @Private
-  @VisibleForTesting
-  public Map<String, LogicalOutput> getOutputs() {
-    return this.outputMap;
-  }
-  
-  @Private
-  @VisibleForTesting
-  public LogicalIOProcessor getProcessor() {
-    return this.processor;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
deleted file mode 100644
index 22cbc7c..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newruntime;
-
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.impl.TaskSpec;
-import org.apache.tez.engine.api.impl.TezEvent;
-import org.apache.tez.engine.api.impl.TezUmbilical;
-
-public abstract class RuntimeTask {
-
-  protected AtomicBoolean hasFatalError = new AtomicBoolean(false);
-  protected Throwable fatalError = null;
-  protected String fatalErrorMessage = null;
-  protected float progress;
-  protected final TezCounters tezCounters;
-  protected final TaskSpec taskSpec;
-  protected final Configuration tezConf;
-  protected final TezUmbilical tezUmbilical;
-  protected final AtomicInteger eventCounter;
-  private final AtomicBoolean taskDone;
-
-  protected RuntimeTask(TaskSpec taskSpec, Configuration tezConf,
-      TezUmbilical tezUmbilical) {
-    this.taskSpec = taskSpec;
-    this.tezConf = tezConf;
-    this.tezUmbilical = tezUmbilical;
-    this.tezCounters = new TezCounters();
-    this.eventCounter = new AtomicInteger(0);
-    this.progress = 0.0f;
-    this.taskDone = new AtomicBoolean(false);
-  }
-
-  protected enum State {
-    NEW, INITED, RUNNING, CLOSED;
-  }
-
-  protected State state;
-
-  public String getVertexName() {
-    return taskSpec.getVertexName();
-  }
-
-  public void setFatalError(Throwable t, String message) {
-    hasFatalError.set(true);
-    this.fatalError = t;
-    this.fatalErrorMessage = message;
-  }
-
-  public boolean hadFatalError() {
-    return hasFatalError.get();
-  }
-
-  public synchronized void setProgress(float progress) {
-    this.progress = progress;
-  }
-
-  public synchronized float getProgress() {
-    return this.progress;
-  }
-
-  public TezCounters getCounters() {
-    return this.tezCounters;
-  }
-
-  public TezTaskAttemptID getTaskAttemptID() {
-    return taskSpec.getTaskAttemptID();
-  }
-
-  public abstract void handleEvents(Collection<TezEvent> events);
-
-  public int getEventCounter() {
-    return eventCounter.get();
-  }
-
-  public boolean isTaskDone() {
-    return taskDone.get();
-  }
-
-  protected void setTaskDone() {
-    taskDone.set(true);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java
deleted file mode 100644
index 20a029e..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeUtils.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newruntime;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.tez.dag.api.TezUncheckedException;
-
-public class RuntimeUtils {
-
-  private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
-
-  private static Class<?> getClazz(String className) {
-    Class<?> clazz = CLAZZ_CACHE.get(className);
-    if (clazz == null) {
-      try {
-        clazz = Class.forName(className);
-      } catch (ClassNotFoundException e) {
-        throw new TezUncheckedException("Unable to load class: " + className, e);
-      }
-    }
-    return clazz;
-  }
-
-  private static <T> T getNewInstance(Class<T> clazz) {
-    T instance;
-    try {
-      instance = clazz.newInstance();
-    } catch (InstantiationException e) {
-      throw new TezUncheckedException(
-          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
-    } catch (IllegalAccessException e) {
-      throw new TezUncheckedException(
-          "Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
-    }
-    return instance;
-  }
-
-  public static <T> T createClazzInstance(String className) {
-    Class<?> clazz = getClazz(className);
-    @SuppressWarnings("unchecked")
-    T instance = (T) getNewInstance(clazz);
-    return instance;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java
deleted file mode 100644
index 531e460..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
-
-import com.google.common.base.Preconditions;
-
-public class DiskFetchedInput extends FetchedInput {
-
-  private static final Log LOG = LogFactory.getLog(DiskFetchedInput.class);
-  
-  private final FileSystem localFS;
-  private final Path tmpOutputPath;
-  private final Path outputPath;
-
-  public DiskFetchedInput(long size,
-      InputAttemptIdentifier inputAttemptIdentifier,
-      FetchedInputCallback callbackHandler, Configuration conf,
-      LocalDirAllocator localDirAllocator, TezTaskOutputFiles filenameAllocator)
-      throws IOException {
-    super(Type.DISK, size, inputAttemptIdentifier, callbackHandler);
-
-    this.localFS = FileSystem.getLocal(conf);
-    this.outputPath = filenameAllocator.getInputFileForWrite(
-        this.inputAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), size);
-    this.tmpOutputPath = outputPath.suffix(String.valueOf(id));
-  }
-
-  @Override
-  public OutputStream getOutputStream() throws IOException {
-    return localFS.create(tmpOutputPath);
-  }
-
-  @Override
-  public InputStream getInputStream() throws IOException {
-    return localFS.open(outputPath);
-  }
-
-  @Override
-  public void commit() throws IOException {
-    if (state == State.PENDING) {
-      state = State.COMMITTED;
-      localFS.rename(tmpOutputPath, outputPath);
-      notifyFetchComplete();
-    }
-  }
-
-  @Override
-  public void abort() throws IOException {
-    if (state == State.PENDING) {
-      state = State.ABORTED;
-      // TODO NEWTEZ Maybe defer this to container cleanup
-      localFS.delete(tmpOutputPath, false);
-      notifyFetchFailure();
-    }
-  }
-  
-  @Override
-  public void free() {
-    Preconditions.checkState(
-        state == State.COMMITTED || state == State.ABORTED,
-        "FetchedInput can only be freed after it is committed or aborted");
-    if (state == State.COMMITTED) {
-      state = State.FREED;
-      try {
-        // TODO NEWTEZ Maybe defer this to container cleanup
-        localFS.delete(outputPath, false);
-      } catch (IOException e) {
-        // Ignoring the exception, will eventually be cleaned by container
-        // cleanup.
-        LOG.warn("Failed to remvoe file : " + outputPath.toString());
-      }
-      notifyFreedResource();
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "DiskFetchedInput [outputPath=" + outputPath
-        + ", inputAttemptIdentifier=" + inputAttemptIdentifier + ", size="
-        + size + ", type=" + type + ", id=" + id + ", state=" + state + "]";
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchResult.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchResult.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchResult.java
deleted file mode 100644
index fb0b324..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchResult.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-public class FetchResult {
-
-  private final String host;
-  private final int port;
-  private final int partition;
-  private final Iterable<InputAttemptIdentifier> pendingInputs;
-
-  public FetchResult(String host, int port, int partition,
-      Iterable<InputAttemptIdentifier> pendingInputs) {
-    this.host = host;
-    this.port = port;
-    this.partition = partition;
-    this.pendingInputs = pendingInputs;
-  }
-
-  public String getHost() {
-    return host;
-  }
-
-  public int getPort() {
-    return port;
-  }
-
-  public int getPartition() {
-    return partition;
-  }
-
-  public Iterable<InputAttemptIdentifier> getPendingInputs() {
-    return pendingInputs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInput.java
deleted file mode 100644
index f5339d3..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInput.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-@Private
-public abstract class FetchedInput {
-  
-  public static enum Type {
-    WAIT, // TODO NEWTEZ Implement this, only if required.
-    MEMORY,
-    DISK,
-  }
-  
-  protected static enum State {
-    PENDING, COMMITTED, ABORTED, FREED
-  }
-
-  private static AtomicInteger ID_GEN = new AtomicInteger(0);
-
-  protected InputAttemptIdentifier inputAttemptIdentifier;
-  protected final long size;
-  protected final Type type;
-  protected final FetchedInputCallback callback;
-  protected final int id;
-  protected State state;
-
-  public FetchedInput(Type type, long size,
-      InputAttemptIdentifier inputAttemptIdentifier,
-      FetchedInputCallback callbackHandler) {
-    this.type = type;
-    this.size = size;
-    this.inputAttemptIdentifier = inputAttemptIdentifier;
-    this.callback = callbackHandler;
-    this.id = ID_GEN.getAndIncrement();
-    this.state = State.PENDING;
-  }
-
-  public Type getType() {
-    return this.type;
-  }
-
-  public long getSize() {
-    return this.size;
-  }
-
-  public InputAttemptIdentifier getInputAttemptIdentifier() {
-    return this.inputAttemptIdentifier;
-  }
-
-  /**
-   * Inform the Allocator about a committed resource.
-   * This should be called by commit
-   */
-  public void notifyFetchComplete() {
-    this.callback.fetchComplete(this);
-  }
-  
-  /**
-   * Inform the Allocator about a failed resource.
-   * This should be called by abort
-   */
-  public void notifyFetchFailure() {
-    this.callback.fetchFailed(this);
-  }
-  
-  /**
-   * Inform the Allocator about a completed resource being released.
-   * This should be called by free
-   */
-  public void notifyFreedResource() {
-    this.callback.freeResources(this);
-  }
-  
-  /**
-   * Returns the output stream to be used to write fetched data. Users are
-   * expected to close the OutputStream when they're done
-   */
-  public abstract OutputStream getOutputStream() throws IOException;
-
-  /**
-   * Return an input stream to be used to read the previously fetched data.
-   * Users are expected to close the InputStream when they're done
-   */
-  public abstract InputStream getInputStream() throws IOException;
-
-  /**
-   * Commit the output. Should be idempotent
-   */
-  public abstract void commit() throws IOException;
-
-  /**
-   * Abort the output. Should be idempotent
-   */
-  public abstract void abort() throws IOException;
-
-  /**
-   * Called when this input has been consumed, so that resources can be
-   * reclaimed.
-   */
-  public abstract void free();
-  
-  @Override
-  public int hashCode() {
-    return id;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    FetchedInput other = (FetchedInput) obj;
-    if (id != other.id)
-      return false;
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputAllocator.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputAllocator.java
deleted file mode 100644
index 7e573f0..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputAllocator.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-
-import java.io.IOException;
-
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-public interface FetchedInputAllocator {
-
-  public FetchedInput allocate(long size,
-      InputAttemptIdentifier inputAttemptIdentifier) throws IOException;
-  
-  
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputCallback.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputCallback.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputCallback.java
deleted file mode 100644
index 2d2d73b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetchedInputCallback.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-
-public interface FetchedInputCallback {
-  
-  public void fetchComplete(FetchedInput fetchedInput);
-  
-  public void fetchFailed(FetchedInput fetchedInput);
-  
-  public void freeResources(FetchedInput fetchedInput);
-  
-}